datasource.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. // Libraries
  2. import _ from 'lodash';
  3. // Services & Utils
  4. import {
  5. dateMath,
  6. DataFrame,
  7. LogRowModel,
  8. LoadingState,
  9. DateTime,
  10. AnnotationEvent,
  11. DataFrameView,
  12. } from '@grafana/data';
  13. import { addLabelToSelector } from 'app/plugins/datasource/prometheus/add_label_to_query';
  14. import LanguageProvider from './language_provider';
  15. import { logStreamToDataFrame } from './result_transformer';
  16. import { formatQuery, parseQuery, getHighlighterExpressionsFromQuery } from './query_utils';
  17. // Types
  18. import {
  19. PluginMeta,
  20. DataSourceApi,
  21. DataSourceInstanceSettings,
  22. DataQueryError,
  23. DataQueryRequest,
  24. DataStreamObserver,
  25. DataQueryResponse,
  26. AnnotationQueryRequest,
  27. } from '@grafana/ui';
  28. import { LokiQuery, LokiOptions, LokiLogsStream, LokiResponse } from './types';
  29. import { BackendSrv } from 'app/core/services/backend_srv';
  30. import { TemplateSrv } from 'app/features/templating/template_srv';
  31. import { safeStringifyValue, convertToWebSocketUrl } from 'app/core/utils/explore';
  32. import { LiveTarget, LiveStreams } from './live_streams';
  33. export const DEFAULT_MAX_LINES = 1000;
  34. const DEFAULT_QUERY_PARAMS = {
  35. direction: 'BACKWARD',
  36. limit: DEFAULT_MAX_LINES,
  37. regexp: '',
  38. query: '',
  39. };
  40. function serializeParams(data: any) {
  41. return Object.keys(data)
  42. .map(k => {
  43. const v = data[k];
  44. return encodeURIComponent(k) + '=' + encodeURIComponent(v);
  45. })
  46. .join('&');
  47. }
  48. interface LokiContextQueryOptions {
  49. direction?: 'BACKWARD' | 'FORWARD';
  50. limit?: number;
  51. }
  52. export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
  53. private streams = new LiveStreams();
  54. languageProvider: LanguageProvider;
  55. maxLines: number;
  56. /** @ngInject */
  57. constructor(
  58. private instanceSettings: DataSourceInstanceSettings<LokiOptions>,
  59. private backendSrv: BackendSrv,
  60. private templateSrv: TemplateSrv
  61. ) {
  62. super(instanceSettings);
  63. this.languageProvider = new LanguageProvider(this);
  64. const settingsData = instanceSettings.jsonData || {};
  65. this.maxLines = parseInt(settingsData.maxLines, 10) || DEFAULT_MAX_LINES;
  66. }
  67. _request(apiUrl: string, data?: any, options?: any) {
  68. const baseUrl = this.instanceSettings.url;
  69. const params = data ? serializeParams(data) : '';
  70. const url = `${baseUrl}${apiUrl}?${params}`;
  71. const req = {
  72. ...options,
  73. url,
  74. };
  75. return this.backendSrv.datasourceRequest(req);
  76. }
  77. prepareLiveTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>): LiveTarget {
  78. const interpolated = this.templateSrv.replace(target.expr);
  79. const { query, regexp } = parseQuery(interpolated);
  80. const refId = target.refId;
  81. const baseUrl = this.instanceSettings.url;
  82. const params = serializeParams({ query, regexp });
  83. const url = convertToWebSocketUrl(`${baseUrl}/api/prom/tail?${params}`);
  84. return {
  85. query,
  86. regexp,
  87. url,
  88. refId,
  89. size: Math.min(options.maxDataPoints || Infinity, this.maxLines),
  90. };
  91. }
  92. prepareQueryTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
  93. const interpolated = this.templateSrv.replace(target.expr);
  94. const { query, regexp } = parseQuery(interpolated);
  95. const start = this.getTime(options.range.from, false);
  96. const end = this.getTime(options.range.to, true);
  97. const refId = target.refId;
  98. return {
  99. ...DEFAULT_QUERY_PARAMS,
  100. query,
  101. regexp,
  102. start,
  103. end,
  104. limit: Math.min(options.maxDataPoints || Infinity, this.maxLines),
  105. refId,
  106. };
  107. }
  108. processError = (err: any, target: any): DataQueryError => {
  109. const error: DataQueryError = {
  110. message: 'Unknown error during query transaction. Please check JS console logs.',
  111. refId: target.refId,
  112. };
  113. if (err.data) {
  114. if (typeof err.data === 'string') {
  115. error.message = err.data;
  116. } else if (err.data.error) {
  117. error.message = safeStringifyValue(err.data.error);
  118. }
  119. } else if (err.message) {
  120. error.message = err.message;
  121. } else if (typeof err === 'string') {
  122. error.message = err;
  123. }
  124. error.status = err.status;
  125. error.statusText = err.statusText;
  126. return error;
  127. };
  128. processResult = (data: LokiLogsStream | LokiResponse, target: any): DataFrame[] => {
  129. const series: DataFrame[] = [];
  130. if (Object.keys(data).length === 0) {
  131. return series;
  132. }
  133. if (!(data as any).streams) {
  134. return [logStreamToDataFrame(data as LokiLogsStream, false, target.refId)];
  135. }
  136. data = data as LokiResponse;
  137. for (const stream of data.streams || []) {
  138. const dataFrame = logStreamToDataFrame(stream);
  139. dataFrame.refId = target.refId;
  140. dataFrame.meta = {
  141. searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)),
  142. limit: this.maxLines,
  143. };
  144. series.push(dataFrame);
  145. }
  146. return series;
  147. };
  148. runLiveQueries = (options: DataQueryRequest<LokiQuery>, observer?: DataStreamObserver) => {
  149. const liveTargets = options.targets
  150. .filter(target => target.expr && !target.hide && target.live)
  151. .map(target => this.prepareLiveTarget(target, options));
  152. for (const liveTarget of liveTargets) {
  153. // Reuse an existing stream if one is already running
  154. const stream = this.streams.getStream(liveTarget);
  155. const subscription = stream.subscribe({
  156. next: (data: DataFrame[]) => {
  157. observer({
  158. key: `loki-${liveTarget.refId}`,
  159. request: options,
  160. state: LoadingState.Streaming,
  161. data,
  162. unsubscribe: () => {
  163. subscription.unsubscribe();
  164. },
  165. });
  166. },
  167. error: (err: any) => {
  168. observer({
  169. key: `loki-${liveTarget.refId}`,
  170. request: options,
  171. state: LoadingState.Error,
  172. error: this.processError(err, liveTarget),
  173. unsubscribe: () => {
  174. subscription.unsubscribe();
  175. },
  176. });
  177. },
  178. });
  179. }
  180. };
  181. runQueries = async (options: DataQueryRequest<LokiQuery>): Promise<{ data: DataFrame[] }> => {
  182. const queryTargets = options.targets
  183. .filter(target => target.expr && !target.hide && !target.live)
  184. .map(target => this.prepareQueryTarget(target, options));
  185. if (queryTargets.length === 0) {
  186. return Promise.resolve({ data: [] });
  187. }
  188. const queries = queryTargets.map(target =>
  189. this._request('/api/prom/query', target).catch((err: any) => {
  190. if (err.cancelled) {
  191. return err;
  192. }
  193. const error: DataQueryError = this.processError(err, target);
  194. throw error;
  195. })
  196. );
  197. return Promise.all(queries).then((results: any[]) => {
  198. let series: DataFrame[] = [];
  199. for (let i = 0; i < results.length; i++) {
  200. const result = results[i];
  201. if (result.data) {
  202. series = series.concat(this.processResult(result.data, queryTargets[i]));
  203. }
  204. }
  205. return { data: series };
  206. });
  207. };
  208. async query(options: DataQueryRequest<LokiQuery>, observer?: DataStreamObserver) {
  209. this.runLiveQueries(options, observer);
  210. return this.runQueries(options);
  211. }
  212. async importQueries(queries: LokiQuery[], originMeta: PluginMeta): Promise<LokiQuery[]> {
  213. return this.languageProvider.importQueries(queries, originMeta.id);
  214. }
  215. metadataRequest(url: string, params?: any) {
  216. // HACK to get label values for {job=|}, will be replaced when implementing LokiQueryField
  217. const apiUrl = url.replace('v1', 'prom');
  218. return this._request(apiUrl, params, { silent: true }).then((res: DataQueryResponse) => {
  219. const data: any = { data: { data: res.data.values || [] } };
  220. return data;
  221. });
  222. }
  223. modifyQuery(query: LokiQuery, action: any): LokiQuery {
  224. const parsed = parseQuery(query.expr || '');
  225. let { query: selector } = parsed;
  226. switch (action.type) {
  227. case 'ADD_FILTER': {
  228. selector = addLabelToSelector(selector, action.key, action.value);
  229. break;
  230. }
  231. default:
  232. break;
  233. }
  234. const expression = formatQuery(selector, parsed.regexp);
  235. return { ...query, expr: expression };
  236. }
  237. getHighlighterExpression(query: LokiQuery): string[] {
  238. return getHighlighterExpressionsFromQuery(query.expr);
  239. }
  240. getTime(date: string | DateTime, roundUp: boolean) {
  241. if (_.isString(date)) {
  242. date = dateMath.parse(date, roundUp);
  243. }
  244. return Math.ceil(date.valueOf() * 1e6);
  245. }
  246. prepareLogRowContextQueryTarget = (row: LogRowModel, limit: number, direction: 'BACKWARD' | 'FORWARD') => {
  247. const query = Object.keys(row.labels)
  248. .map(label => {
  249. return `${label}="${row.labels[label]}"`;
  250. })
  251. .join(',');
  252. const contextTimeBuffer = 2 * 60 * 60 * 1000 * 1e6; // 2h buffer
  253. const timeEpochNs = row.timeEpochMs * 1e6;
  254. const commontTargetOptons = {
  255. limit,
  256. query: `{${query}}`,
  257. direction,
  258. };
  259. if (direction === 'BACKWARD') {
  260. return {
  261. ...commontTargetOptons,
  262. start: timeEpochNs - contextTimeBuffer,
  263. end: row.timestamp, // using RFC3339Nano format to avoid precision loss
  264. direction,
  265. };
  266. } else {
  267. return {
  268. ...commontTargetOptons,
  269. start: row.timestamp, // start param in Loki API is inclusive so we'll have to filter out the row that this request is based from
  270. end: timeEpochNs + contextTimeBuffer,
  271. };
  272. }
  273. };
  274. getLogRowContext = async (row: LogRowModel, options?: LokiContextQueryOptions) => {
  275. const target = this.prepareLogRowContextQueryTarget(
  276. row,
  277. (options && options.limit) || 10,
  278. (options && options.direction) || 'BACKWARD'
  279. );
  280. const series: DataFrame[] = [];
  281. try {
  282. const reverse = options && options.direction === 'FORWARD';
  283. const result = await this._request('/api/prom/query', target);
  284. if (result.data) {
  285. for (const stream of result.data.streams || []) {
  286. series.push(logStreamToDataFrame(stream, reverse));
  287. }
  288. }
  289. return {
  290. data: series,
  291. };
  292. } catch (e) {
  293. const error: DataQueryError = {
  294. message: 'Error during context query. Please check JS console logs.',
  295. status: e.status,
  296. statusText: e.statusText,
  297. };
  298. throw error;
  299. }
  300. };
  301. testDatasource() {
  302. // Consider only last 10 minutes otherwise request takes too long
  303. const startMs = Date.now() - 10 * 60 * 1000;
  304. const start = `${startMs}000000`; // API expects nanoseconds
  305. return this._request('/api/prom/label', { start })
  306. .then((res: DataQueryResponse) => {
  307. if (res && res.data && res.data.values && res.data.values.length > 0) {
  308. return { status: 'success', message: 'Data source connected and labels found.' };
  309. }
  310. return {
  311. status: 'error',
  312. message:
  313. 'Data source connected, but no labels received. Verify that Loki and Promtail is configured properly.',
  314. };
  315. })
  316. .catch((err: any) => {
  317. let message = 'Loki: ';
  318. if (err.statusText) {
  319. message += err.statusText;
  320. } else {
  321. message += 'Cannot connect to Loki';
  322. }
  323. if (err.status) {
  324. message += `. ${err.status}`;
  325. }
  326. if (err.data && err.data.message) {
  327. message += `. ${err.data.message}`;
  328. } else if (err.data) {
  329. message += `. ${err.data}`;
  330. }
  331. return { status: 'error', message: message };
  332. });
  333. }
  334. async annotationQuery(options: AnnotationQueryRequest<LokiQuery>): Promise<AnnotationEvent[]> {
  335. if (!options.annotation.expr) {
  336. return [];
  337. }
  338. const query = queryRequestFromAnnotationOptions(options);
  339. const { data } = await this.runQueries(query);
  340. const annotations: AnnotationEvent[] = [];
  341. for (const frame of data) {
  342. const tags = Object.values(frame.labels);
  343. const view = new DataFrameView<{ ts: string; line: string }>(frame);
  344. view.forEachRow(row => {
  345. annotations.push({
  346. time: new Date(row.ts).valueOf(),
  347. text: row.line,
  348. tags,
  349. });
  350. });
  351. }
  352. return annotations;
  353. }
  354. }
  355. function queryRequestFromAnnotationOptions(options: AnnotationQueryRequest<LokiQuery>): DataQueryRequest<LokiQuery> {
  356. const refId = `annotation-${options.annotation.name}`;
  357. const target: LokiQuery = { refId, expr: options.annotation.expr };
  358. return {
  359. requestId: refId,
  360. range: options.range,
  361. targets: [target],
  362. dashboardId: options.dashboard.id,
  363. scopedVars: null,
  364. startTime: Date.now(),
  365. // This should mean the default defined on datasource is used.
  366. maxDataPoints: 0,
  367. // Dummy values, are required in type but not used here.
  368. timezone: 'utc',
  369. panelId: 0,
  370. interval: '',
  371. intervalMs: 0,
  372. };
  373. }
  374. export default LokiDatasource;