datasource.ts 11 KB


  1. // Libraries
  2. import _ from 'lodash';
  3. import { Subscription, of } from 'rxjs';
  4. import { webSocket } from 'rxjs/webSocket';
  5. import { catchError, map } from 'rxjs/operators';
  6. // Services & Utils
  7. import * as dateMath from '@grafana/ui/src/utils/datemath';
  8. import { addLabelToSelector } from 'app/plugins/datasource/prometheus/add_label_to_query';
  9. import LanguageProvider from './language_provider';
  10. import { logStreamToDataFrame } from './result_transformer';
  11. import { formatQuery, parseQuery, getHighlighterExpressionsFromQuery } from './query_utils';
  12. // Types
  13. import {
  14. PluginMeta,
  15. DataQueryRequest,
  16. DataFrame,
  17. DataSourceApi,
  18. DataSourceInstanceSettings,
  19. DataQueryError,
  20. LogRowModel,
  21. DataStreamObserver,
  22. LoadingState,
  23. DataStreamState,
  24. DataQueryResponse,
  25. DateTime,
  26. } from '@grafana/ui';
  27. import { LokiQuery, LokiOptions } from './types';
  28. import { BackendSrv } from 'app/core/services/backend_srv';
  29. import { TemplateSrv } from 'app/features/templating/template_srv';
  30. import { safeStringifyValue, convertToWebSocketUrl } from 'app/core/utils/explore';
  31. export const DEFAULT_MAX_LINES = 1000;
  32. const DEFAULT_QUERY_PARAMS = {
  33. direction: 'BACKWARD',
  34. limit: DEFAULT_MAX_LINES,
  35. regexp: '',
  36. query: '',
  37. };
  38. function serializeParams(data: any) {
  39. return Object.keys(data)
  40. .map(k => {
  41. const v = data[k];
  42. return encodeURIComponent(k) + '=' + encodeURIComponent(v);
  43. })
  44. .join('&');
  45. }
  46. interface LokiContextQueryOptions {
  47. direction?: 'BACKWARD' | 'FORWARD';
  48. limit?: number;
  49. }
  50. export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
  51. private subscriptions: { [key: string]: Subscription } = null;
  52. languageProvider: LanguageProvider;
  53. maxLines: number;
  54. /** @ngInject */
  55. constructor(
  56. private instanceSettings: DataSourceInstanceSettings<LokiOptions>,
  57. private backendSrv: BackendSrv,
  58. private templateSrv: TemplateSrv
  59. ) {
  60. super(instanceSettings);
  61. this.languageProvider = new LanguageProvider(this);
  62. const settingsData = instanceSettings.jsonData || {};
  63. this.maxLines = parseInt(settingsData.maxLines, 10) || DEFAULT_MAX_LINES;
  64. this.subscriptions = {};
  65. }
  66. _request(apiUrl: string, data?: any, options?: any) {
  67. const baseUrl = this.instanceSettings.url;
  68. const params = data ? serializeParams(data) : '';
  69. const url = `${baseUrl}${apiUrl}?${params}`;
  70. const req = {
  71. ...options,
  72. url,
  73. };
  74. return this.backendSrv.datasourceRequest(req);
  75. }
  76. prepareLiveTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
  77. const interpolated = this.templateSrv.replace(target.expr);
  78. const { query, regexp } = parseQuery(interpolated);
  79. const refId = target.refId;
  80. const baseUrl = this.instanceSettings.url;
  81. const params = serializeParams({ query, regexp });
  82. const url = convertToWebSocketUrl(`${baseUrl}/api/prom/tail?${params}`);
  83. return {
  84. query,
  85. regexp,
  86. url,
  87. refId,
  88. };
  89. }
  90. prepareQueryTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
  91. const interpolated = this.templateSrv.replace(target.expr);
  92. const { query, regexp } = parseQuery(interpolated);
  93. const start = this.getTime(options.range.from, false);
  94. const end = this.getTime(options.range.to, true);
  95. const refId = target.refId;
  96. return {
  97. ...DEFAULT_QUERY_PARAMS,
  98. query,
  99. regexp,
  100. start,
  101. end,
  102. limit: this.maxLines,
  103. refId,
  104. };
  105. }
  106. unsubscribe = (refId: string) => {
  107. const subscription = this.subscriptions[refId];
  108. if (subscription && !subscription.closed) {
  109. subscription.unsubscribe();
  110. delete this.subscriptions[refId];
  111. }
  112. };
  113. processError = (err: any, target: any): DataQueryError => {
  114. const error: DataQueryError = {
  115. message: 'Unknown error during query transaction. Please check JS console logs.',
  116. refId: target.refId,
  117. };
  118. if (err.data) {
  119. if (typeof err.data === 'string') {
  120. error.message = err.data;
  121. } else if (err.data.error) {
  122. error.message = safeStringifyValue(err.data.error);
  123. }
  124. } else if (err.message) {
  125. error.message = err.message;
  126. } else if (typeof err === 'string') {
  127. error.message = err;
  128. }
  129. error.status = err.status;
  130. error.statusText = err.statusText;
  131. return error;
  132. };
  133. processResult = (data: any, target: any): DataFrame[] => {
  134. const series: DataFrame[] = [];
  135. if (Object.keys(data).length === 0) {
  136. return series;
  137. }
  138. if (!data.streams) {
  139. return [{ ...logStreamToDataFrame(data), refId: target.refId }];
  140. }
  141. for (const stream of data.streams || []) {
  142. const dataFrame = logStreamToDataFrame(stream);
  143. dataFrame.refId = target.refId;
  144. dataFrame.meta = {
  145. searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)),
  146. limit: this.maxLines,
  147. };
  148. series.push(dataFrame);
  149. }
  150. return series;
  151. };
  152. runLiveQueries = (options: DataQueryRequest<LokiQuery>, observer?: DataStreamObserver) => {
  153. const liveTargets = options.targets
  154. .filter(target => target.expr && !target.hide && target.live)
  155. .map(target => this.prepareLiveTarget(target, options));
  156. for (const liveTarget of liveTargets) {
  157. const subscription = webSocket(liveTarget.url)
  158. .pipe(
  159. map((results: any[]) => {
  160. const delta = this.processResult(results, liveTarget);
  161. const state: DataStreamState = {
  162. key: `loki-${liveTarget.refId}`,
  163. request: options,
  164. state: LoadingState.Streaming,
  165. delta,
  166. unsubscribe: () => this.unsubscribe(liveTarget.refId),
  167. };
  168. return state;
  169. }),
  170. catchError(err => {
  171. const error = this.processError(err, liveTarget);
  172. const state: DataStreamState = {
  173. key: `loki-${liveTarget.refId}`,
  174. request: options,
  175. state: LoadingState.Error,
  176. error,
  177. unsubscribe: () => this.unsubscribe(liveTarget.refId),
  178. };
  179. return of(state);
  180. })
  181. )
  182. .subscribe({
  183. next: state => observer(state),
  184. });
  185. this.subscriptions[liveTarget.refId] = subscription;
  186. }
  187. };
  188. runQueries = async (options: DataQueryRequest<LokiQuery>) => {
  189. const queryTargets = options.targets
  190. .filter(target => target.expr && !target.hide && !target.live)
  191. .map(target => this.prepareQueryTarget(target, options));
  192. if (queryTargets.length === 0) {
  193. return Promise.resolve({ data: [] });
  194. }
  195. const queries = queryTargets.map(target =>
  196. this._request('/api/prom/query', target).catch((err: any) => {
  197. if (err.cancelled) {
  198. return err;
  199. }
  200. const error: DataQueryError = this.processError(err, target);
  201. throw error;
  202. })
  203. );
  204. return Promise.all(queries).then((results: any[]) => {
  205. let series: DataFrame[] = [];
  206. for (let i = 0; i < results.length; i++) {
  207. const result = results[i];
  208. if (result.data) {
  209. series = series.concat(this.processResult(result.data, queryTargets[i]));
  210. }
  211. }
  212. return { data: series };
  213. });
  214. };
  215. async query(options: DataQueryRequest<LokiQuery>, observer?: DataStreamObserver) {
  216. this.runLiveQueries(options, observer);
  217. return this.runQueries(options);
  218. }
  219. async importQueries(queries: LokiQuery[], originMeta: PluginMeta): Promise<LokiQuery[]> {
  220. return this.languageProvider.importQueries(queries, originMeta.id);
  221. }
  222. metadataRequest(url: string) {
  223. // HACK to get label values for {job=|}, will be replaced when implementing LokiQueryField
  224. const apiUrl = url.replace('v1', 'prom');
  225. return this._request(apiUrl, { silent: true }).then((res: DataQueryResponse) => {
  226. const data: any = { data: { data: res.data.values || [] } };
  227. return data;
  228. });
  229. }
  230. modifyQuery(query: LokiQuery, action: any): LokiQuery {
  231. const parsed = parseQuery(query.expr || '');
  232. let { query: selector } = parsed;
  233. switch (action.type) {
  234. case 'ADD_FILTER': {
  235. selector = addLabelToSelector(selector, action.key, action.value);
  236. break;
  237. }
  238. default:
  239. break;
  240. }
  241. const expression = formatQuery(selector, parsed.regexp);
  242. return { ...query, expr: expression };
  243. }
  244. getHighlighterExpression(query: LokiQuery): string[] {
  245. return getHighlighterExpressionsFromQuery(query.expr);
  246. }
  247. getTime(date: string | DateTime, roundUp: boolean) {
  248. if (_.isString(date)) {
  249. date = dateMath.parse(date, roundUp);
  250. }
  251. return Math.ceil(date.valueOf() * 1e6);
  252. }
  253. prepareLogRowContextQueryTarget = (row: LogRowModel, limit: number, direction: 'BACKWARD' | 'FORWARD') => {
  254. const query = Object.keys(row.labels)
  255. .map(label => {
  256. return `${label}="${row.labels[label]}"`;
  257. })
  258. .join(',');
  259. const contextTimeBuffer = 2 * 60 * 60 * 1000 * 1e6; // 2h buffer
  260. const timeEpochNs = row.timeEpochMs * 1e6;
  261. const commontTargetOptons = {
  262. limit,
  263. query: `{${query}}`,
  264. direction,
  265. };
  266. if (direction === 'BACKWARD') {
  267. return {
  268. ...commontTargetOptons,
  269. start: timeEpochNs - contextTimeBuffer,
  270. end: row.timestamp,
  271. direction,
  272. };
  273. } else {
  274. return {
  275. ...commontTargetOptons,
  276. start: row.timestamp, // start param in Loki API is inclusive so we'll have to filter out the row that this request is based from
  277. end: timeEpochNs + contextTimeBuffer,
  278. };
  279. }
  280. };
  281. getLogRowContext = async (row: LogRowModel, options?: LokiContextQueryOptions) => {
  282. const target = this.prepareLogRowContextQueryTarget(
  283. row,
  284. (options && options.limit) || 10,
  285. (options && options.direction) || 'BACKWARD'
  286. );
  287. const series: DataFrame[] = [];
  288. try {
  289. const result = await this._request('/api/prom/query', target);
  290. if (result.data) {
  291. for (const stream of result.data.streams || []) {
  292. const dataFrame = logStreamToDataFrame(stream);
  293. series.push(dataFrame);
  294. }
  295. }
  296. if (options && options.direction === 'FORWARD') {
  297. if (series[0] && series[0].rows) {
  298. series[0].rows.reverse();
  299. }
  300. }
  301. return {
  302. data: series,
  303. };
  304. } catch (e) {
  305. const error: DataQueryError = {
  306. message: 'Error during context query. Please check JS console logs.',
  307. status: e.status,
  308. statusText: e.statusText,
  309. };
  310. throw error;
  311. }
  312. };
  313. testDatasource() {
  314. return this._request('/api/prom/label')
  315. .then((res: DataQueryResponse) => {
  316. if (res && res.data && res.data.values && res.data.values.length > 0) {
  317. return { status: 'success', message: 'Data source connected and labels found.' };
  318. }
  319. return {
  320. status: 'error',
  321. message:
  322. 'Data source connected, but no labels received. Verify that Loki and Promtail is configured properly.',
  323. };
  324. })
  325. .catch((err: any) => {
  326. let message = 'Loki: ';
  327. if (err.statusText) {
  328. message += err.statusText;
  329. } else {
  330. message += 'Cannot connect to Loki';
  331. }
  332. if (err.status) {
  333. message += `. ${err.status}`;
  334. }
  335. if (err.data && err.data.message) {
  336. message += `. ${err.data.message}`;
  337. } else if (err.data) {
  338. message += `. ${err.data}`;
  339. }
  340. return { status: 'error', message: message };
  341. });
  342. }
  343. }
  344. export default LokiDatasource;