datasource.ts 12 KB


  1. // Libraries
  2. import _ from 'lodash';
  3. // Services & Utils
  4. import {
  5. dateMath,
  6. DataFrame,
  7. LogRowModel,
  8. DateTime,
  9. AnnotationEvent,
  10. DataFrameView,
  11. LoadingState,
  12. } from '@grafana/data';
  13. import { addLabelToSelector } from 'app/plugins/datasource/prometheus/add_label_to_query';
  14. import LanguageProvider from './language_provider';
  15. import { logStreamToDataFrame } from './result_transformer';
  16. import { formatQuery, parseQuery, getHighlighterExpressionsFromQuery } from './query_utils';
  17. // Types
  18. import {
  19. PluginMeta,
  20. DataSourceApi,
  21. DataSourceInstanceSettings,
  22. DataQueryError,
  23. DataQueryRequest,
  24. DataQueryResponse,
  25. AnnotationQueryRequest,
  26. } from '@grafana/ui';
  27. import { LokiQuery, LokiOptions, LokiLogsStream, LokiResponse } from './types';
  28. import { BackendSrv } from 'app/core/services/backend_srv';
  29. import { TemplateSrv } from 'app/features/templating/template_srv';
  30. import { safeStringifyValue, convertToWebSocketUrl } from 'app/core/utils/explore';
  31. import { LiveTarget, LiveStreams } from './live_streams';
  32. import { Observable, from, merge } from 'rxjs';
  33. import { map, filter } from 'rxjs/operators';
  34. export const DEFAULT_MAX_LINES = 1000;
  35. const DEFAULT_QUERY_PARAMS = {
  36. direction: 'BACKWARD',
  37. limit: DEFAULT_MAX_LINES,
  38. regexp: '',
  39. query: '',
  40. };
  41. function serializeParams(data: any) {
  42. return Object.keys(data)
  43. .map(k => {
  44. const v = data[k];
  45. return encodeURIComponent(k) + '=' + encodeURIComponent(v);
  46. })
  47. .join('&');
  48. }
  49. interface LokiContextQueryOptions {
  50. direction?: 'BACKWARD' | 'FORWARD';
  51. limit?: number;
  52. }
  53. export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
  54. private streams = new LiveStreams();
  55. languageProvider: LanguageProvider;
  56. maxLines: number;
  57. /** @ngInject */
  58. constructor(
  59. private instanceSettings: DataSourceInstanceSettings<LokiOptions>,
  60. private backendSrv: BackendSrv,
  61. private templateSrv: TemplateSrv
  62. ) {
  63. super(instanceSettings);
  64. this.languageProvider = new LanguageProvider(this);
  65. const settingsData = instanceSettings.jsonData || {};
  66. this.maxLines = parseInt(settingsData.maxLines, 10) || DEFAULT_MAX_LINES;
  67. }
  68. _request(apiUrl: string, data?: any, options?: any) {
  69. const baseUrl = this.instanceSettings.url;
  70. const params = data ? serializeParams(data) : '';
  71. const url = `${baseUrl}${apiUrl}?${params}`;
  72. const req = {
  73. ...options,
  74. url,
  75. };
  76. return this.backendSrv.datasourceRequest(req);
  77. }
  78. prepareLiveTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>): LiveTarget {
  79. const interpolated = this.templateSrv.replace(target.expr);
  80. const { query, regexp } = parseQuery(interpolated);
  81. const refId = target.refId;
  82. const baseUrl = this.instanceSettings.url;
  83. const params = serializeParams({ query, regexp });
  84. const url = convertToWebSocketUrl(`${baseUrl}/api/prom/tail?${params}`);
  85. return {
  86. query,
  87. regexp,
  88. url,
  89. refId,
  90. size: Math.min(options.maxDataPoints || Infinity, this.maxLines),
  91. };
  92. }
  93. prepareQueryTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
  94. const interpolated = this.templateSrv.replace(target.expr);
  95. const { query, regexp } = parseQuery(interpolated);
  96. const start = this.getTime(options.range.from, false);
  97. const end = this.getTime(options.range.to, true);
  98. const refId = target.refId;
  99. return {
  100. ...DEFAULT_QUERY_PARAMS,
  101. query,
  102. regexp,
  103. start,
  104. end,
  105. limit: Math.min(options.maxDataPoints || Infinity, this.maxLines),
  106. refId,
  107. };
  108. }
  109. processError = (err: any, target: any): DataQueryError => {
  110. const error: DataQueryError = {
  111. message: 'Unknown error during query transaction. Please check JS console logs.',
  112. refId: target.refId,
  113. };
  114. if (err.data) {
  115. if (typeof err.data === 'string') {
  116. error.message = err.data;
  117. } else if (err.data.error) {
  118. error.message = safeStringifyValue(err.data.error);
  119. }
  120. } else if (err.message) {
  121. error.message = err.message;
  122. } else if (typeof err === 'string') {
  123. error.message = err;
  124. }
  125. error.status = err.status;
  126. error.statusText = err.statusText;
  127. return error;
  128. };
  129. processResult = (data: LokiLogsStream | LokiResponse, target: any): DataFrame[] => {
  130. const series: DataFrame[] = [];
  131. if (Object.keys(data).length === 0) {
  132. return series;
  133. }
  134. if (!(data as any).streams) {
  135. return [logStreamToDataFrame(data as LokiLogsStream, false, target.refId)];
  136. }
  137. data = data as LokiResponse;
  138. for (const stream of data.streams || []) {
  139. const dataFrame = logStreamToDataFrame(stream);
  140. dataFrame.refId = target.refId;
  141. dataFrame.meta = {
  142. searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)),
  143. limit: this.maxLines,
  144. };
  145. series.push(dataFrame);
  146. }
  147. return series;
  148. };
  149. runLiveQuery = (options: DataQueryRequest<LokiQuery>, target: LokiQuery): Observable<DataQueryResponse> => {
  150. const liveTarget = this.prepareLiveTarget(target, options);
  151. const stream = this.streams.getStream(liveTarget);
  152. return stream.pipe(
  153. map(data => {
  154. return {
  155. data,
  156. key: `loki-${liveTarget.refId}`,
  157. state: LoadingState.Streaming,
  158. };
  159. })
  160. );
  161. };
  162. runQuery = (options: DataQueryRequest<LokiQuery>, target: LokiQuery): Observable<DataQueryResponse> => {
  163. const query = this.prepareQueryTarget(target, options);
  164. return from(
  165. this._request('/api/prom/query', query).catch((err: any) => {
  166. if (err.cancelled) {
  167. return err;
  168. }
  169. const error: DataQueryError = this.processError(err, query);
  170. throw error;
  171. })
  172. ).pipe(
  173. filter((response: any) => (response.cancelled ? false : true)),
  174. map((response: any) => {
  175. const data = this.processResult(response.data, query);
  176. return { data, key: query.refId };
  177. })
  178. );
  179. };
  180. query(options: DataQueryRequest<LokiQuery>): Observable<DataQueryResponse> {
  181. const subQueries = options.targets
  182. .filter(target => target.expr && !target.hide)
  183. .map(target => {
  184. if (target.live) {
  185. return this.runLiveQuery(options, target);
  186. }
  187. return this.runQuery(options, target);
  188. });
  189. return merge(...subQueries);
  190. }
  191. async importQueries(queries: LokiQuery[], originMeta: PluginMeta): Promise<LokiQuery[]> {
  192. return this.languageProvider.importQueries(queries, originMeta.id);
  193. }
  194. metadataRequest(url: string, params?: any) {
  195. // HACK to get label values for {job=|}, will be replaced when implementing LokiQueryField
  196. const apiUrl = url.replace('v1', 'prom');
  197. return this._request(apiUrl, params, { silent: true }).then((res: DataQueryResponse) => {
  198. const data: any = { data: { data: res.data.values || [] } };
  199. return data;
  200. });
  201. }
  202. modifyQuery(query: LokiQuery, action: any): LokiQuery {
  203. const parsed = parseQuery(query.expr || '');
  204. let { query: selector } = parsed;
  205. switch (action.type) {
  206. case 'ADD_FILTER': {
  207. selector = addLabelToSelector(selector, action.key, action.value);
  208. break;
  209. }
  210. default:
  211. break;
  212. }
  213. const expression = formatQuery(selector, parsed.regexp);
  214. return { ...query, expr: expression };
  215. }
  216. getHighlighterExpression(query: LokiQuery): string[] {
  217. return getHighlighterExpressionsFromQuery(query.expr);
  218. }
  219. getTime(date: string | DateTime, roundUp: boolean) {
  220. if (_.isString(date)) {
  221. date = dateMath.parse(date, roundUp);
  222. }
  223. return Math.ceil(date.valueOf() * 1e6);
  224. }
  225. prepareLogRowContextQueryTarget = (row: LogRowModel, limit: number, direction: 'BACKWARD' | 'FORWARD') => {
  226. const query = Object.keys(row.labels)
  227. .map(label => {
  228. return `${label}="${row.labels[label]}"`;
  229. })
  230. .join(',');
  231. const contextTimeBuffer = 2 * 60 * 60 * 1000 * 1e6; // 2h buffer
  232. const timeEpochNs = row.timeEpochMs * 1e6;
  233. const commontTargetOptons = {
  234. limit,
  235. query: `{${query}}`,
  236. direction,
  237. };
  238. if (direction === 'BACKWARD') {
  239. return {
  240. ...commontTargetOptons,
  241. start: timeEpochNs - contextTimeBuffer,
  242. end: row.timestamp, // using RFC3339Nano format to avoid precision loss
  243. direction,
  244. };
  245. } else {
  246. return {
  247. ...commontTargetOptons,
  248. start: row.timestamp, // start param in Loki API is inclusive so we'll have to filter out the row that this request is based from
  249. end: timeEpochNs + contextTimeBuffer,
  250. };
  251. }
  252. };
  253. getLogRowContext = async (row: LogRowModel, options?: LokiContextQueryOptions) => {
  254. const target = this.prepareLogRowContextQueryTarget(
  255. row,
  256. (options && options.limit) || 10,
  257. (options && options.direction) || 'BACKWARD'
  258. );
  259. const series: DataFrame[] = [];
  260. try {
  261. const reverse = options && options.direction === 'FORWARD';
  262. const result = await this._request('/api/prom/query', target);
  263. if (result.data) {
  264. for (const stream of result.data.streams || []) {
  265. series.push(logStreamToDataFrame(stream, reverse));
  266. }
  267. }
  268. return {
  269. data: series,
  270. };
  271. } catch (e) {
  272. const error: DataQueryError = {
  273. message: 'Error during context query. Please check JS console logs.',
  274. status: e.status,
  275. statusText: e.statusText,
  276. };
  277. throw error;
  278. }
  279. };
  280. testDatasource() {
  281. // Consider only last 10 minutes otherwise request takes too long
  282. const startMs = Date.now() - 10 * 60 * 1000;
  283. const start = `${startMs}000000`; // API expects nanoseconds
  284. return this._request('/api/prom/label', { start })
  285. .then((res: DataQueryResponse) => {
  286. if (res && res.data && res.data.values && res.data.values.length > 0) {
  287. return { status: 'success', message: 'Data source connected and labels found.' };
  288. }
  289. return {
  290. status: 'error',
  291. message:
  292. 'Data source connected, but no labels received. Verify that Loki and Promtail is configured properly.',
  293. };
  294. })
  295. .catch((err: any) => {
  296. let message = 'Loki: ';
  297. if (err.statusText) {
  298. message += err.statusText;
  299. } else {
  300. message += 'Cannot connect to Loki';
  301. }
  302. if (err.status) {
  303. message += `. ${err.status}`;
  304. }
  305. if (err.data && err.data.message) {
  306. message += `. ${err.data.message}`;
  307. } else if (err.data) {
  308. message += `. ${err.data}`;
  309. }
  310. return { status: 'error', message: message };
  311. });
  312. }
  313. async annotationQuery(options: AnnotationQueryRequest<LokiQuery>): Promise<AnnotationEvent[]> {
  314. if (!options.annotation.expr) {
  315. return [];
  316. }
  317. const request = queryRequestFromAnnotationOptions(options);
  318. const { data } = await this.runQuery(request, request.targets[0]).toPromise();
  319. const annotations: AnnotationEvent[] = [];
  320. for (const frame of data) {
  321. const tags = Object.values(frame.labels) as string[];
  322. const view = new DataFrameView<{ ts: string; line: string }>(frame);
  323. view.forEachRow(row => {
  324. annotations.push({
  325. time: new Date(row.ts).valueOf(),
  326. text: row.line,
  327. tags,
  328. });
  329. });
  330. }
  331. return annotations;
  332. }
  333. }
  334. function queryRequestFromAnnotationOptions(options: AnnotationQueryRequest<LokiQuery>): DataQueryRequest<LokiQuery> {
  335. const refId = `annotation-${options.annotation.name}`;
  336. const target: LokiQuery = { refId, expr: options.annotation.expr };
  337. return {
  338. requestId: refId,
  339. range: options.range,
  340. targets: [target],
  341. dashboardId: options.dashboard.id,
  342. scopedVars: null,
  343. startTime: Date.now(),
  344. // This should mean the default defined on datasource is used.
  345. maxDataPoints: 0,
  346. // Dummy values, are required in type but not used here.
  347. timezone: 'utc',
  348. panelId: 0,
  349. interval: '',
  350. intervalMs: 0,
  351. };
  352. }
  353. export default LokiDatasource;