Browse Source

Loki: support loki with streaming in dashboards (#18709)

Move some of the buffering with live streaming inside of the datasource, sending full frames instead of deltas and allow Loki in dashboards.
Ryan McKinley 6 years ago
parent
commit
991f77cee1

+ 1 - 1
public/app/core/utils/explore.ts

@@ -492,7 +492,7 @@ export enum SortOrder {
 export const refreshIntervalToSortOrder = (refreshInterval: string) =>
   isLive(refreshInterval) ? SortOrder.Ascending : SortOrder.Descending;
 
-export const sortLogsResult = (logsResult: LogsModel, sortOrder: SortOrder) => {
+export const sortLogsResult = (logsResult: LogsModel, sortOrder: SortOrder): LogsModel => {
   const rows = logsResult ? logsResult.rows : [];
   sortOrder === SortOrder.Ascending ? rows.sort(sortInAscendingOrder) : rows.sort(sortInDescendingOrder);
   const result: LogsModel = logsResult ? { ...logsResult, rows } : { hasUniqueLabels: false, rows };

+ 1 - 1
public/app/features/dashboard/panel_editor/QueryOptions.tsx

@@ -71,7 +71,7 @@ export class QueryOptions extends PureComponent<Props, State> {
       tooltipInfo: (
         <>
           The maximum data points the query should return. For graphs this is automatically set to one data point per
-          pixel.
+          pixel. For some data sources this can also be capped in the datasource settings page.
         </>
       ),
     },

+ 1 - 1
public/app/features/dashboard/state/PanelQueryRunner.ts

@@ -218,7 +218,7 @@ export class PanelQueryRunner {
   }
 
   /**
-   * Called after every streaming event.  This should be throttled so we
+   * Called after every streaming event. This should be throttled so we
    * avoid accidentally overwhelming the browser
    */
   onStreamingDataUpdated = throttle(

+ 1 - 1
public/app/features/dashboard/state/PanelQueryState.ts

@@ -212,7 +212,7 @@ export class PanelQueryState {
 
     this.streams = [];
 
-    // Move the series from streams to the resposne
+    // Move the series from streams to the response
     if (keepSeries) {
       const { response } = this;
       this.response = {

+ 20 - 4
public/app/features/explore/LiveLogs.tsx

@@ -1,7 +1,8 @@
 import React, { PureComponent } from 'react';
 import { css, cx } from 'emotion';
-import { Themeable, withTheme, GrafanaTheme, selectThemeVariant, getLogRowStyles } from '@grafana/ui';
+import { last } from 'lodash';
 
+import { Themeable, withTheme, GrafanaTheme, selectThemeVariant, getLogRowStyles } from '@grafana/ui';
 import { LogsModel, LogRowModel, TimeZone } from '@grafana/data';
 
 import ElapsedTime from './ElapsedTime';
@@ -48,6 +49,7 @@ export interface Props extends Themeable {
 
 interface State {
   logsResultToRender?: LogsModel;
+  lastTimestamp: number;
 }
 
 class LiveLogs extends PureComponent<Props, State> {
@@ -59,6 +61,7 @@ class LiveLogs extends PureComponent<Props, State> {
     super(props);
     this.state = {
       logsResultToRender: props.logsResult,
+      lastTimestamp: 0,
     };
   }
 
@@ -81,13 +84,17 @@ class LiveLogs extends PureComponent<Props, State> {
     }
   }
 
-  static getDerivedStateFromProps(nextProps: Props) {
+  static getDerivedStateFromProps(nextProps: Props, state: State) {
     if (!nextProps.isPaused) {
       return {
         // We update what we show only if not paused. We keep any background subscriptions running and keep updating
         // our state, but we do not show the updates, this allows us start again showing correct result after resuming
         // without creating a gap in the log results.
         logsResultToRender: nextProps.logsResult,
+        lastTimestamp:
+          state.logsResultToRender && last(state.logsResultToRender.rows)
+            ? last(state.logsResultToRender.rows).timeEpochMs
+            : 0,
       };
     } else {
       return null;
@@ -119,6 +126,15 @@ class LiveLogs extends PureComponent<Props, State> {
     return rowsToRender;
   };
 
+  /**
+   * Check if row is fresh so we can apply special styling. This is bit naive and does not take into account rows
+   * which arrive out of order. Because loki datasource sends full data instead of deltas we need to compare the
+   * data and this is easier than doing some intersection of some uuid of each row (which we do not have now anyway)
+   */
+  isFresh = (row: LogRowModel): boolean => {
+    return row.timeEpochMs > this.state.lastTimestamp;
+  };
+
   render() {
     const { theme, timeZone, onPause, onResume, isPaused } = this.props;
     const styles = getStyles(theme);
@@ -132,10 +148,10 @@ class LiveLogs extends PureComponent<Props, State> {
           className={cx(['logs-rows', styles.logsRowsLive])}
           ref={this.scrollContainerRef}
         >
-          {this.rowsToRender().map((row: any, index) => {
+          {this.rowsToRender().map((row: LogRowModel, index) => {
             return (
               <div
-                className={row.fresh ? cx([logsRow, styles.logsRowFresh]) : cx([logsRow, styles.logsRowOld])}
+                className={cx(logsRow, this.isFresh(row) ? styles.logsRowFresh : styles.logsRowOld)}
                 key={`${row.timeEpochMs}-${index}`}
               >
                 {showUtc && (

+ 8 - 1
public/app/features/explore/state/actions.ts

@@ -432,6 +432,7 @@ export function runQueries(exploreId: ExploreId): ThunkResult<void> {
       range,
       scanning,
       history,
+      mode,
     } = exploreItemState;
 
     if (datasourceError) {
@@ -454,7 +455,13 @@ export function runQueries(exploreId: ExploreId): ThunkResult<void> {
     queryState.sendFrames = true;
     queryState.sendLegacy = true;
 
-    const queryOptions = { interval, maxDataPoints: containerWidth, live };
+    const queryOptions = {
+      interval,
+      // This is used for logs streaming for buffer size.
+      // TODO: not sure if this makes sense for normal query when using both graph and table
+      maxDataPoints: mode === ExploreMode.Logs ? 1000 : containerWidth,
+      live,
+    };
     const datasourceId = datasourceInstance.meta.id;
     const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning);
 

+ 28 - 22
public/app/features/explore/state/reducers.ts

@@ -12,7 +12,7 @@ import {
 } from 'app/core/utils/explore';
 import { ExploreItemState, ExploreState, ExploreId, ExploreUpdateState, ExploreMode } from 'app/types/explore';
 import { LoadingState } from '@grafana/data';
-import { DataQuery, PanelData } from '@grafana/ui';
+import { DataQuery, DataSourceApi, PanelData } from '@grafana/ui';
 import {
   HigherOrderAction,
   ActionTypes,
@@ -264,25 +264,7 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
     filter: updateDatasourceInstanceAction,
     mapper: (state, action): ExploreItemState => {
       const { datasourceInstance } = action.payload;
-      // Capabilities
-      const supportsGraph = datasourceInstance.meta.metrics;
-      const supportsLogs = datasourceInstance.meta.logs;
-
-      let mode = state.mode || ExploreMode.Metrics;
-      const supportedModes: ExploreMode[] = [];
-
-      if (supportsGraph) {
-        supportedModes.push(ExploreMode.Metrics);
-      }
-
-      if (supportsLogs) {
-        supportedModes.push(ExploreMode.Logs);
-      }
-
-      if (supportedModes.length === 1) {
-        mode = supportedModes[0];
-      }
-
+      const [supportedModes, mode] = getModesForDatasource(datasourceInstance, state.mode);
       // Custom components
       const StartPage = datasourceInstance.components.ExploreStartPage;
       stopQueryState(state.queryState, 'Datasource changed');
@@ -586,7 +568,6 @@ export const processQueryResponse = (
 ): ExploreItemState => {
   const { response } = action.payload;
   const { request, state: loadingState, series, legacy, error } = response;
-  const replacePreviousResults = action.type === queryEndedAction.type;
 
   if (error) {
     if (error.cancelled) {
@@ -615,7 +596,7 @@ export const processQueryResponse = (
   }
 
   const latency = request.endTime - request.startTime;
-  const processor = new ResultProcessor(state, replacePreviousResults, series);
+  const processor = new ResultProcessor(state, series);
 
   // For Angular editors
   state.eventBridge.emit('data-received', legacy);
@@ -674,6 +655,31 @@ export const updateChildRefreshState = (
   };
 };
 
+const getModesForDatasource = (dataSource: DataSourceApi, currentMode: ExploreMode): [ExploreMode[], ExploreMode] => {
+  // Temporary hack here. We want Loki to work in dashboards for which it needs to have metrics = true which is weird
+  // for Explore.
+  // TODO: need to figure out a better way to handle this situation
+  const supportsGraph = dataSource.meta.name === 'Loki' ? false : dataSource.meta.metrics;
+  const supportsLogs = dataSource.meta.logs;
+
+  let mode = currentMode || ExploreMode.Metrics;
+  const supportedModes: ExploreMode[] = [];
+
+  if (supportsGraph) {
+    supportedModes.push(ExploreMode.Metrics);
+  }
+
+  if (supportsLogs) {
+    supportedModes.push(ExploreMode.Logs);
+  }
+
+  if (supportedModes.length === 1) {
+    mode = supportedModes[0];
+  }
+
+  return [supportedModes, mode];
+};
+
 /**
  * Global Explore reducer that handles multiple Explore areas (left and right).
  * Actions that have an `exploreId` get routed to the ExploreItemReducer.

+ 2 - 163
public/app/features/explore/utils/ResultProcessor.test.ts

@@ -16,7 +16,7 @@ jest.mock('@grafana/data/src/utils/moment_wrapper', () => ({
 import { ResultProcessor } from './ResultProcessor';
 import { ExploreItemState, ExploreMode } from 'app/types/explore';
 import TableModel from 'app/core/table_model';
-import { TimeSeries, LogRowModel, LogsMetaItem, GraphSeriesXY, toDataFrame, FieldType } from '@grafana/data';
+import { TimeSeries, LogRowModel, toDataFrame, FieldType } from '@grafana/data';
 
 const testContext = (options: any = {}) => {
   const timeSeries = toDataFrame({
@@ -40,7 +40,6 @@ const testContext = (options: any = {}) => {
 
   const defaultOptions = {
     mode: ExploreMode.Metrics,
-    replacePreviousResults: true,
     dataFrames: [timeSeries, table],
     graphResult: [] as TimeSeries[],
     tableResult: new TableModel(),
@@ -57,11 +56,7 @@ const testContext = (options: any = {}) => {
     queryIntervals: { intervalMs: 10 },
   } as any) as ExploreItemState;
 
-  const resultProcessor = new ResultProcessor(
-    state,
-    combinedOptions.replacePreviousResults,
-    combinedOptions.dataFrames
-  );
+  const resultProcessor = new ResultProcessor(state, combinedOptions.dataFrames);
 
   return {
     dataFrames: combinedOptions.dataFrames,
@@ -206,160 +201,4 @@ describe('ResultProcessor', () => {
       });
     });
   });
-
-  describe('constructed with result that is a DataQueryResponse and merging with previous results', () => {
-    describe('when calling getLogsResult', () => {
-      it('then it should return correct logs result', () => {
-        const { resultProcessor } = testContext({
-          mode: ExploreMode.Logs,
-          replacePreviousResults: false,
-          logsResult: {
-            hasUniqueLabels: false,
-            meta: [],
-            rows: [
-              {
-                entry: 'This is a previous message 1',
-                fresh: true,
-                hasAnsi: false,
-                labels: { cluster: 'some-cluster' },
-                logLevel: 'unknown',
-                raw: 'This is a previous message 1',
-                searchWords: [] as string[],
-                timeEpochMs: 1558038519831,
-                timeFromNow: 'fromNow() jest mocked',
-                timeLocal: 'format() jest mocked',
-                timeUtc: 'format() jest mocked',
-                timestamp: 1558038519831,
-                uniqueLabels: {},
-              },
-              {
-                entry: 'This is a previous message 2',
-                fresh: true,
-                hasAnsi: false,
-                labels: { cluster: 'some-cluster' },
-                logLevel: 'unknown',
-                raw: 'This is a previous message 2',
-                searchWords: [] as string[],
-                timeEpochMs: 1558038518831,
-                timeFromNow: 'fromNow() jest mocked',
-                timeLocal: 'format() jest mocked',
-                timeUtc: 'format() jest mocked',
-                timestamp: 1558038518831,
-                uniqueLabels: {},
-              },
-            ],
-            series: [
-              {
-                label: 'A-series',
-                color: '#7EB26D',
-                data: [[1558038518831, 37.91264531864214], [1558038519831, 38.35179822906545]],
-                info: undefined,
-                isVisible: true,
-                yAxis: {
-                  index: 1,
-                },
-              },
-            ],
-          },
-        });
-
-        const theResult = resultProcessor.getLogsResult();
-        const expected = {
-          hasUniqueLabels: false,
-          meta: [] as LogsMetaItem[],
-          rows: [
-            {
-              entry: 'This is a previous message 1',
-              fresh: false,
-              hasAnsi: false,
-              labels: { cluster: 'some-cluster' },
-              logLevel: 'unknown',
-              raw: 'This is a previous message 1',
-              searchWords: [] as string[],
-              timeEpochMs: 1558038519831,
-              timeFromNow: 'fromNow() jest mocked',
-              timeLocal: 'format() jest mocked',
-              timeUtc: 'format() jest mocked',
-              timestamp: 1558038519831,
-              uniqueLabels: {},
-            },
-            {
-              entry: 'This is a previous message 2',
-              fresh: false,
-              hasAnsi: false,
-              labels: { cluster: 'some-cluster' },
-              logLevel: 'unknown',
-              raw: 'This is a previous message 2',
-              searchWords: [] as string[],
-              timeEpochMs: 1558038518831,
-              timeFromNow: 'fromNow() jest mocked',
-              timeLocal: 'format() jest mocked',
-              timeUtc: 'format() jest mocked',
-              timestamp: 1558038518831,
-              uniqueLabels: {},
-            },
-            {
-              entry: 'third',
-              fresh: true,
-              hasAnsi: false,
-              labels: undefined,
-              logLevel: 'unknown',
-              raw: 'third',
-              searchWords: [] as string[],
-              timeEpochMs: 300,
-              timeFromNow: 'fromNow() jest mocked',
-              timeLocal: 'format() jest mocked',
-              timeUtc: 'format() jest mocked',
-              timestamp: 300,
-              uniqueLabels: {},
-            },
-            {
-              entry: 'second message',
-              fresh: true,
-              hasAnsi: false,
-              labels: undefined,
-              logLevel: 'unknown',
-              raw: 'second message',
-              searchWords: [] as string[],
-              timeEpochMs: 200,
-              timeFromNow: 'fromNow() jest mocked',
-              timeLocal: 'format() jest mocked',
-              timeUtc: 'format() jest mocked',
-              timestamp: 200,
-              uniqueLabels: {},
-            },
-            {
-              entry: 'this is a message',
-              fresh: true,
-              hasAnsi: false,
-              labels: undefined,
-              logLevel: 'unknown',
-              raw: 'this is a message',
-              searchWords: [] as string[],
-              timeEpochMs: 100,
-              timeFromNow: 'fromNow() jest mocked',
-              timeLocal: 'format() jest mocked',
-              timeUtc: 'format() jest mocked',
-              timestamp: 100,
-              uniqueLabels: {},
-            },
-          ],
-          series: [
-            {
-              label: 'A-series',
-              color: '#7EB26D',
-              data: [[100, 4], [200, 5], [300, 6]],
-              info: undefined,
-              isVisible: true,
-              yAxis: {
-                index: 1,
-              },
-            } as GraphSeriesXY,
-          ],
-        };
-
-        expect(theResult).toEqual(expected);
-      });
-    });
-  });
 });

+ 3 - 29
public/app/features/explore/utils/ResultProcessor.ts

@@ -7,11 +7,7 @@ import { dataFrameToLogsModel } from 'app/core/logs_model';
 import { getGraphSeriesModel } from 'app/plugins/panel/graph2/getGraphSeriesModel';
 
 export class ResultProcessor {
-  constructor(
-    private state: ExploreItemState,
-    private replacePreviousResults: boolean,
-    private dataFrames: DataFrame[]
-  ) {}
+  constructor(private state: ExploreItemState, private dataFrames: DataFrame[]) {}
 
   getGraphResult(): GraphSeriesXY[] {
     if (this.state.mode !== ExploreMode.Metrics) {
@@ -79,30 +75,8 @@ export class ResultProcessor {
     const sortOrder = refreshIntervalToSortOrder(this.state.refreshInterval);
     const sortedNewResults = sortLogsResult(newResults, sortOrder);
 
-    if (this.replacePreviousResults) {
-      const slice = 1000;
-      const rows = sortedNewResults.rows.slice(0, slice);
-      const series = sortedNewResults.series;
-
-      return { ...sortedNewResults, rows, series };
-    }
-
-    const prevLogsResult: LogsModel = this.state.logsResult || { hasUniqueLabels: false, rows: [] };
-    const sortedLogResult = sortLogsResult(prevLogsResult, sortOrder);
-    const rowsInState = sortedLogResult.rows;
-
-    const processedRows = [];
-    for (const row of rowsInState) {
-      processedRows.push({ ...row, fresh: false });
-    }
-    for (const row of sortedNewResults.rows) {
-      processedRows.push({ ...row, fresh: true });
-    }
-
-    const slice = -1000;
-    const rows = processedRows.slice(slice);
-    const series = sortedNewResults.series.slice(slice);
-
+    const rows = sortedNewResults.rows;
+    const series = sortedNewResults.series;
     return { ...sortedNewResults, rows, series };
   }
 }

+ 49 - 73
public/app/plugins/datasource/loki/components/LokiQueryEditor.tsx

@@ -1,90 +1,66 @@
 // Libraries
-import React, { PureComponent } from 'react';
+import React, { memo } from 'react';
 
 // Types
-import { QueryEditorProps } from '@grafana/ui';
+import { AbsoluteTimeRange } from '@grafana/data';
+import { QueryEditorProps, Switch, DataSourceStatus } from '@grafana/ui';
 import { LokiDatasource } from '../datasource';
 import { LokiQuery } from '../types';
-// import { LokiQueryField } from './LokiQueryField';
+import { LokiQueryField } from './LokiQueryField';
+import { useLokiSyntax } from './useLokiSyntax';
 
 type Props = QueryEditorProps<LokiDatasource, LokiQuery>;
 
-// interface State {
-//   query: LokiQuery;
-// }
+export const LokiQueryEditor = memo(function LokiQueryEditor(props: Props) {
+  const { query, panelData, datasource, onChange, onRunQuery } = props;
 
-export class LokiQueryEditor extends PureComponent<Props> {
-  // state: State = {
-  //   query: this.props.query,
-  // };
-  //
-  // onRunQuery = () => {
-  //   const { query } = this.state;
-  //
-  //   this.props.onChange(query);
-  //   this.props.onRunQuery();
-  // };
-  //
-  // onFieldChange = (query: LokiQuery, override?) => {
-  //   this.setState({
-  //     query: {
-  //       ...this.state.query,
-  //       expr: query.expr,
-  //     },
-  //   });
-  // };
-  //
-  // onFormatChanged = (option: SelectableValue) => {
-  //   this.props.onChange({
-  //     ...this.state.query,
-  //     resultFormat: option.value,
-  //   });
-  // };
+  let absolute: AbsoluteTimeRange;
+  if (panelData && panelData.request) {
+    const { range } = panelData.request;
+    absolute = {
+      from: range.from.valueOf(),
+      to: range.to.valueOf(),
+    };
+  } else {
+    absolute = {
+      from: Date.now() - 10000,
+      to: Date.now(),
+    };
+  }
 
-  render() {
-    // const { query } = this.state;
-    // const { datasource } = this.props;
-    // const formatOptions: SelectableValue[] = [
-    //   { label: 'Time Series', value: 'time_series' },
-    //   { label: 'Table', value: 'table' },
-    // ];
-    //
-    // query.resultFormat = query.resultFormat || 'time_series';
-    // const currentFormat = formatOptions.find(item => item.value === query.resultFormat);
+  const { isSyntaxReady, setActiveOption, refreshLabels, ...syntaxProps } = useLokiSyntax(
+    datasource.languageProvider,
+    // TODO maybe use real status
+    DataSourceStatus.Connected,
+    absolute
+  );
 
-    return (
-      <div>
+  return (
+    <div>
+      <LokiQueryField
+        datasource={datasource}
+        datasourceStatus={DataSourceStatus.Connected}
+        query={query}
+        onChange={onChange}
+        onRunQuery={onRunQuery}
+        history={[]}
+        panelData={panelData}
+        onLoadOptions={setActiveOption}
+        onLabelsRefresh={refreshLabels}
+        syntaxLoaded={isSyntaxReady}
+        absoluteRange={absolute}
+        {...syntaxProps}
+      />
+      <div className="gf-form-inline">
         <div className="gf-form">
-          <div className="gf-form-label">
-            Loki is currently not supported as dashboard data source. We are working on it!
-          </div>
+          <Switch label="Live" checked={!!query.live} onChange={() => onChange({ ...query, live: !query.live })} />
         </div>
-        {/*
-        <LokiQueryField
-          datasource={datasource}
-          query={query}
-          onQueryChange={this.onFieldChange}
-          onExecuteQuery={this.onRunQuery}
-          history={[]}
-        />
-        <div className="gf-form-inline">
-          <div className="gf-form">
-            <div className="gf-form-label">Format as</div>
-            <Select
-              isSearchable={false}
-              options={formatOptions}
-              onChange={this.onFormatChanged}
-              value={currentFormat}
-            />
-          </div>
-          <div className="gf-form gf-form--grow">
-            <div className="gf-form-label gf-form-label--grow" />
-          </div>
+        <div className="gf-form gf-form--grow">
+          <div className="gf-form-label gf-form-label--grow" />
         </div>
-        */}
       </div>
-    );
-  }
-}
+    </div>
+  );
+});
 
 export default LokiQueryEditor;

+ 1 - 1
public/app/plugins/datasource/loki/components/LokiQueryField.tsx

@@ -2,7 +2,7 @@ import React, { FunctionComponent } from 'react';
 import { LokiQueryFieldForm, LokiQueryFieldFormProps } from './LokiQueryFieldForm';
 import { useLokiSyntax } from './useLokiSyntax';
 
-const LokiQueryField: FunctionComponent<LokiQueryFieldFormProps> = ({
+export const LokiQueryField: FunctionComponent<LokiQueryFieldFormProps> = ({
   datasource,
   datasourceStatus,
   ...otherProps

+ 58 - 16
public/app/plugins/datasource/loki/datasource.test.ts

@@ -31,28 +31,34 @@ describe('LokiDatasource', () => {
       replace: (a: string) => a,
     } as unknown) as TemplateSrv;
 
-    test('should use default max lines when no limit given', () => {
-      const ds = new LokiDatasource(instanceSettings, backendSrv, templateSrvMock);
-      backendSrvMock.datasourceRequest = jest.fn(() => Promise.resolve(testResp));
-      const options = getQueryOptions<LokiQuery>({ targets: [{ expr: 'foo', refId: 'B' }] });
-
-      ds.query(options);
+    const testLimit = makeLimitTest(instanceSettings, backendSrvMock, backendSrv, templateSrvMock, testResp);
 
-      expect(backendSrvMock.datasourceRequest.mock.calls.length).toBe(1);
-      expect(backendSrvMock.datasourceRequest.mock.calls[0][0].url).toContain('limit=1000');
+    test('should use default max lines when no limit given', () => {
+      testLimit({
+        expectedLimit: 1000,
+      });
     });
 
     test('should use custom max lines if limit is set', () => {
-      const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 };
-      const customSettings = { ...instanceSettings, jsonData: customData };
-      const ds = new LokiDatasource(customSettings, backendSrv, templateSrvMock);
-      backendSrvMock.datasourceRequest = jest.fn(() => Promise.resolve(testResp));
+      testLimit({
+        maxLines: 20,
+        expectedLimit: 20,
+      });
+    });
 
-      const options = getQueryOptions<LokiQuery>({ targets: [{ expr: 'foo', refId: 'B' }] });
-      ds.query(options);
+    test('should use custom maxDataPoints if set in request', () => {
+      testLimit({
+        maxDataPoints: 500,
+        expectedLimit: 500,
+      });
+    });
 
-      expect(backendSrvMock.datasourceRequest.mock.calls.length).toBe(1);
-      expect(backendSrvMock.datasourceRequest.mock.calls[0][0].url).toContain('limit=20');
+    test('should use datasource maxLimit if maxDataPoints is higher', () => {
+      testLimit({
+        maxLines: 20,
+        maxDataPoints: 500,
+        expectedLimit: 20,
+      });
     });
 
     test('should return series data', async done => {
@@ -166,3 +172,39 @@ describe('LokiDatasource', () => {
     });
   });
 });
+
+type LimitTestArgs = {
+  maxDataPoints?: number;
+  maxLines?: number;
+  expectedLimit: number;
+};
+function makeLimitTest(
+  instanceSettings: any,
+  backendSrvMock: any,
+  backendSrv: any,
+  templateSrvMock: any,
+  testResp: any
+) {
+  return ({ maxDataPoints, maxLines, expectedLimit }: LimitTestArgs) => {
+    let settings = instanceSettings;
+    if (Number.isFinite(maxLines)) {
+      const customData = { ...(instanceSettings.jsonData || {}), maxLines: 20 };
+      settings = { ...instanceSettings, jsonData: customData };
+    }
+    const ds = new LokiDatasource(settings, backendSrv, templateSrvMock);
+    backendSrvMock.datasourceRequest = jest.fn(() => Promise.resolve(testResp));
+
+    const options = getQueryOptions<LokiQuery>({ targets: [{ expr: 'foo', refId: 'B' }] });
+    if (Number.isFinite(maxDataPoints)) {
+      options.maxDataPoints = maxDataPoints;
+    } else {
+      // By default is 500
+      delete options.maxDataPoints;
+    }
+
+    ds.query(options);
+
+    expect(backendSrvMock.datasourceRequest.mock.calls.length).toBe(1);
+    expect(backendSrvMock.datasourceRequest.mock.calls[0][0].url).toContain(`limit=${expectedLimit}`);
+  };
+}

+ 39 - 57
public/app/plugins/datasource/loki/datasource.ts

@@ -1,8 +1,5 @@
 // Libraries
 import _ from 'lodash';
-import { Subscription, of } from 'rxjs';
-import { webSocket } from 'rxjs/webSocket';
-import { catchError, map } from 'rxjs/operators';
 // Services & Utils
 import { dateMath, DataFrame, LogRowModel, LoadingState, DateTime } from '@grafana/data';
 import { addLabelToSelector } from 'app/plugins/datasource/prometheus/add_label_to_query';
@@ -17,13 +14,14 @@ import {
   DataQueryError,
   DataQueryRequest,
   DataStreamObserver,
-  DataStreamState,
   DataQueryResponse,
 } from '@grafana/ui';
-import { LokiQuery, LokiOptions } from './types';
+
+import { LokiQuery, LokiOptions, LokiLogsStream, LokiResponse } from './types';
 import { BackendSrv } from 'app/core/services/backend_srv';
 import { TemplateSrv } from 'app/features/templating/template_srv';
 import { safeStringifyValue, convertToWebSocketUrl } from 'app/core/utils/explore';
+import { LiveTarget, LiveStreams } from './live_streams';
 
 export const DEFAULT_MAX_LINES = 1000;
 
@@ -49,7 +47,7 @@ interface LokiContextQueryOptions {
 }
 
 export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
-  private subscriptions: { [key: string]: Subscription } = null;
+  private streams = new LiveStreams();
   languageProvider: LanguageProvider;
   maxLines: number;
 
@@ -63,7 +61,6 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
     this.languageProvider = new LanguageProvider(this);
     const settingsData = instanceSettings.jsonData || {};
     this.maxLines = parseInt(settingsData.maxLines, 10) || DEFAULT_MAX_LINES;
-    this.subscriptions = {};
   }
 
   _request(apiUrl: string, data?: any, options?: any) {
@@ -78,18 +75,20 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
     return this.backendSrv.datasourceRequest(req);
   }
 
-  prepareLiveTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
+  prepareLiveTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>): LiveTarget {
     const interpolated = this.templateSrv.replace(target.expr);
     const { query, regexp } = parseQuery(interpolated);
     const refId = target.refId;
     const baseUrl = this.instanceSettings.url;
     const params = serializeParams({ query, regexp });
     const url = convertToWebSocketUrl(`${baseUrl}/api/prom/tail?${params}`);
+
     return {
       query,
       regexp,
       url,
       refId,
+      size: Math.min(options.maxDataPoints || Infinity, this.maxLines),
     };
   }
 
@@ -105,19 +104,11 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
       regexp,
       start,
       end,
-      limit: this.maxLines,
+      limit: Math.min(options.maxDataPoints || Infinity, this.maxLines),
       refId,
     };
   }
 
-  unsubscribe = (refId: string) => {
-    const subscription = this.subscriptions[refId];
-    if (subscription && !subscription.closed) {
-      subscription.unsubscribe();
-      delete this.subscriptions[refId];
-    }
-  };
-
   processError = (err: any, target: any): DataQueryError => {
     const error: DataQueryError = {
       message: 'Unknown error during query transaction. Please check JS console logs.',
@@ -142,17 +133,18 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
     return error;
   };
 
-  processResult = (data: any, target: any): DataFrame[] => {
+  processResult = (data: LokiLogsStream | LokiResponse, target: any): DataFrame[] => {
     const series: DataFrame[] = [];
 
     if (Object.keys(data).length === 0) {
       return series;
     }
 
-    if (!data.streams) {
-      return [logStreamToDataFrame(data, target.refId)];
+    if (!(data as any).streams) {
+      return [logStreamToDataFrame(data as LokiLogsStream, false, target.refId)];
     }
 
+    data = data as LokiResponse;
     for (const stream of data.streams || []) {
       const dataFrame = logStreamToDataFrame(stream);
       dataFrame.refId = target.refId;
@@ -172,38 +164,32 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
       .map(target => this.prepareLiveTarget(target, options));
 
     for (const liveTarget of liveTargets) {
-      const subscription = webSocket(liveTarget.url)
-        .pipe(
-          map((results: any[]) => {
-            const data = this.processResult(results, liveTarget);
-            const state: DataStreamState = {
-              key: `loki-${liveTarget.refId}`,
-              request: options,
-              state: LoadingState.Streaming,
-              data,
-              unsubscribe: () => this.unsubscribe(liveTarget.refId),
-            };
-
-            return state;
-          }),
-          catchError(err => {
-            const error = this.processError(err, liveTarget);
-            const state: DataStreamState = {
-              key: `loki-${liveTarget.refId}`,
-              request: options,
-              state: LoadingState.Error,
-              error,
-              unsubscribe: () => this.unsubscribe(liveTarget.refId),
-            };
-
-            return of(state);
-          })
-        )
-        .subscribe({
-          next: state => observer(state),
-        });
-
-      this.subscriptions[liveTarget.refId] = subscription;
+      // Reuse an existing stream if one is already running
+      const stream = this.streams.getStream(liveTarget);
+      const subscription = stream.subscribe({
+        next: (data: DataFrame[]) => {
+          observer({
+            key: `loki-${liveTarget.refId}`,
+            request: options,
+            state: LoadingState.Streaming,
+            data,
+            unsubscribe: () => {
+              subscription.unsubscribe();
+            },
+          });
+        },
+        error: (err: any) => {
+          observer({
+            key: `loki-${liveTarget.refId}`,
+            request: options,
+            state: LoadingState.Error,
+            error: this.processError(err, liveTarget),
+            unsubscribe: () => {
+              subscription.unsubscribe();
+            },
+          });
+        },
+      });
     }
   };
 
@@ -330,11 +316,7 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
       const result = await this._request('/api/prom/query', target);
       if (result.data) {
         for (const stream of result.data.streams || []) {
-          const dataFrame = logStreamToDataFrame(stream);
-          if (reverse) {
-            dataFrame.reverse();
-          }
-          series.push(dataFrame);
+          series.push(logStreamToDataFrame(stream, reverse));
         }
       }
 

+ 207 - 0
public/app/plugins/datasource/loki/live_streams.test.ts

@@ -0,0 +1,207 @@
+import { Subject, Observable } from 'rxjs';
+import * as rxJsWebSocket from 'rxjs/webSocket';
+import { LiveStreams } from './live_streams';
+import { DataFrameView, Labels, formatLabels, DataFrame } from '@grafana/data';
+import { noop } from 'lodash';
+
+let fakeSocket: Subject<any>;
+jest.mock('rxjs/webSocket', () => {
+  return {
+    __esModule: true,
+    webSocket: () => fakeSocket,
+  };
+});
+
+describe('Live Stream Tests', () => {
+  afterAll(() => {
+    jest.restoreAllMocks();
+  });
+
+  const msg0: any = {
+    streams: [
+      {
+        labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
+        entries: [
+          {
+            ts: '2019-08-28T20:50:40.118944705Z',
+            line: 'Kittens',
+          },
+        ],
+      },
+    ],
+    dropped_entries: null,
+  };
+
+  it('reads the values into the buffer', done => {
+    fakeSocket = new Subject<any>();
+    const labels: Labels = { job: 'varlogs' };
+    const target = makeTarget('fake', labels);
+    const stream = new LiveStreams().getStream(target);
+    expect.assertions(5);
+
+    const tests = [
+      (val: DataFrame[]) => expect(val).toEqual([]),
+      (val: DataFrame[]) => {
+        expect(val[0].length).toEqual(7);
+        expect(val[0].labels).toEqual(labels);
+      },
+      (val: DataFrame[]) => {
+        expect(val[0].length).toEqual(8);
+        const view = new DataFrameView(val[0]);
+        const last = { ...view.get(view.length - 1) };
+        expect(last).toEqual({
+          ts: '2019-08-28T20:50:40.118944705Z',
+          line: 'Kittens',
+          labels: { filename: '/var/log/sntpc.log' },
+        });
+      },
+    ];
+    stream.subscribe({
+      next: val => {
+        const test = tests.shift();
+        test(val);
+      },
+      complete: () => done(),
+    });
+
+    // Send it the initial list of things
+    fakeSocket.next(initialRawResponse);
+    // Send it a single update
+    fakeSocket.next(msg0);
+    fakeSocket.complete();
+  });
+
+  it('returns the same subscription if the url matches existing one', () => {
+    fakeSocket = new Subject<any>();
+    const liveStreams = new LiveStreams();
+    const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
+    const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
+    expect(stream1).toBe(stream2);
+  });
+
+  it('returns new subscription when the previous unsubscribed', () => {
+    fakeSocket = new Subject<any>();
+    const liveStreams = new LiveStreams();
+    const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
+    const subscription = stream1.subscribe({
+      next: noop,
+    });
+    subscription.unsubscribe();
+
+    const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
+    expect(stream1).not.toBe(stream2);
+  });
+
+  it('returns new subscription when the previous is unsubscribed and correctly unsubscribes from source', () => {
+    let unsubscribed = false;
+    fakeSocket = new Observable(() => {
+      return () => (unsubscribed = true);
+    }) as any;
+    const spy = spyOn(rxJsWebSocket, 'webSocket');
+    spy.and.returnValue(fakeSocket);
+
+    const liveStreams = new LiveStreams();
+    const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
+    const subscription = stream1.subscribe({
+      next: noop,
+    });
+    subscription.unsubscribe();
+    expect(unsubscribed).toBe(true);
+  });
+});
+
+/**
+ * Create target (query to run). Url is what is used as cache key.
+ */
+function makeTarget(url: string, labels?: Labels) {
+  labels = labels || { job: 'varlogs' };
+  return {
+    url,
+    size: 10,
+    query: formatLabels(labels),
+    refId: 'A',
+    regexp: '',
+  };
+}
+
+//----------------------------------------------------------------
+// Added this at the end so the top is more readable
+//----------------------------------------------------------------
+
+const initialRawResponse: any = {
+  streams: [
+    {
+      labels: '{filename="/var/log/docker.log", job="varlogs"}',
+      entries: [
+        {
+          ts: '2019-08-28T20:43:38.215447855Z',
+          line:
+            '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147149490Z" ' +
+            'level=debug msg="[resolver] received AAAA record \\"::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
+        },
+      ],
+    },
+    {
+      labels: '{filename="/var/log/docker.log", job="varlogs"}',
+      entries: [
+        {
+          ts: '2019-08-28T20:43:38.215450388Z',
+          line:
+            '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147224630Z" ' +
+            'level=debug msg="[resolver] received AAAA record \\"fe80::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
+        },
+      ],
+    },
+    {
+      labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
+      entries: [
+        {
+          ts: '2019-08-28T20:43:40.452525099Z',
+          line: '2019-08-28T20:43:40Z sntpc sntpc[1]: offset=-0.022171, delay=0.000463',
+        },
+      ],
+    },
+    {
+      labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
+      entries: [
+        {
+          ts: '2019-08-28T20:44:10.297164454Z',
+          line: '2019-08-28T20:44:10Z sntpc sntpc[1]: offset=-0.022327, delay=0.000527',
+        },
+      ],
+    },
+    {
+      labels: '{filename="/var/log/lifecycle-server.log", job="varlogs"}',
+      entries: [
+        {
+          ts: '2019-08-28T20:44:38.152248647Z',
+          line:
+            '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095444834Z" ' +
+            'level=debug msg="Name To resolve: localhost."',
+        },
+      ],
+    },
+    {
+      labels: '{filename="/var/log/lifecycle-server.log", job="varlogs"}',
+      entries: [
+        {
+          ts: '2019-08-28T20:44:38.15225554Z',
+          line:
+            '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095896074Z" ' +
+            'level=debug msg="[resolver] query localhost. (A) from 172.22.0.4:53748, forwarding to udp:192.168.65.1"',
+        },
+      ],
+    },
+    {
+      labels: '{filename="/var/log/docker.log", job="varlogs"}',
+      entries: [
+        {
+          ts: '2019-08-28T20:44:38.152271475Z',
+          line:
+            '2019-08-28T20:44:38Z docker time="2019-08-28T20:44:38.095444834Z" level=debug msg="Name To resolve: localhost."',
+        },
+      ],
+    },
+  ],
+  dropped_entries: null,
+};

+ 51 - 0
public/app/plugins/datasource/loki/live_streams.ts

@@ -0,0 +1,51 @@
+import { DataFrame, FieldType, parseLabels, KeyValue, CircularDataFrame } from '@grafana/data';
+import { Observable, BehaviorSubject } from 'rxjs';
+import { webSocket } from 'rxjs/webSocket';
+import { LokiResponse } from './types';
+import { finalize, map, multicast, refCount } from 'rxjs/operators';
+import { appendResponseToBufferedData } from './result_transformer';
+
+/**
+ * Maps directly to a query in the UI (refId is key)
+ */
+export interface LiveTarget {
+  query: string;
+  regexp: string;
+  url: string;
+  refId: string;
+  size: number;
+}
+
+/**
+ * Cache of websocket streams that can be returned as observable. In case there already is a stream for particular
+ * target it is returned and on subscription returns the latest dataFrame.
+ */
+export class LiveStreams {
+  private streams: KeyValue<Observable<DataFrame[]>> = {};
+
+  getStream(target: LiveTarget): Observable<DataFrame[]> {
+    let stream = this.streams[target.url];
+    if (!stream) {
+      const data = new CircularDataFrame({ capacity: target.size });
+      data.labels = parseLabels(target.query);
+      data.addField({ name: 'ts', type: FieldType.time, config: { title: 'Time' } });
+      data.addField({ name: 'line', type: FieldType.string });
+      data.addField({ name: 'labels', type: FieldType.other });
+
+      const subject = new BehaviorSubject<DataFrame[]>([]);
+      stream = webSocket(target.url).pipe(
+        finalize(() => {
+          delete this.streams[target.url];
+        }),
+        map((response: LokiResponse) => {
+          appendResponseToBufferedData(response, data);
+          return [data];
+        }),
+        multicast(subject),
+        refCount()
+      );
+      this.streams[target.url] = stream;
+    }
+    return stream;
+  }
+}

+ 6 - 3
public/app/plugins/datasource/loki/plugin.json

@@ -4,12 +4,16 @@
   "id": "loki",
   "category": "logging",
 
-  "metrics": false,
+  "metrics": true,
   "alerting": false,
   "annotations": false,
   "logs": true,
   "streaming": true,
 
+  "queryOptions": {
+    "maxDataPoints": true
+  },
+
   "info": {
     "description": "Like Prometheus but for logs. OSS logging solution from Grafana Labs",
     "author": {
@@ -29,7 +33,6 @@
         "name": "GitHub Project",
         "url": "https://github.com/grafana/loki"
       }
-    ],
-    "version": "5.3.0"
+    ]
   }
 }

+ 53 - 11
public/app/plugins/datasource/loki/result_transformer.ts

@@ -1,25 +1,67 @@
-import { LokiLogsStream } from './types';
-import { parseLabels, FieldType, Labels, MutableDataFrame } from '@grafana/data';
+import { LokiLogsStream, LokiResponse } from './types';
+import {
+  parseLabels,
+  FieldType,
+  Labels,
+  DataFrame,
+  ArrayVector,
+  MutableDataFrame,
+  findUniqueLabels,
+} from '@grafana/data';
 
-export function logStreamToDataFrame(stream: LokiLogsStream, refId?: string): MutableDataFrame {
+/**
+ * Transforms LokiLogStream structure into a dataFrame. Used when doing standard queries.
+ */
+export function logStreamToDataFrame(stream: LokiLogsStream, reverse?: boolean, refId?: string): DataFrame {
   let labels: Labels = stream.parsedLabels;
   if (!labels && stream.labels) {
     labels = parseLabels(stream.labels);
   }
-  const time: string[] = [];
-  const lines: string[] = [];
+  const times = new ArrayVector<string>([]);
+  const lines = new ArrayVector<string>([]);
 
   for (const entry of stream.entries) {
-    time.push(entry.ts || entry.timestamp);
-    lines.push(entry.line);
+    times.add(entry.ts || entry.timestamp);
+    lines.add(entry.line);
   }
 
-  return new MutableDataFrame({
+  if (reverse) {
+    times.buffer = times.buffer.reverse();
+    lines.buffer = lines.buffer.reverse();
+  }
+
+  return {
     refId,
     labels,
     fields: [
-      { name: 'ts', type: FieldType.time, values: time }, // Time
-      { name: 'line', type: FieldType.string, values: lines }, // Line
+      { name: 'ts', type: FieldType.time, config: { title: 'Time' }, values: times }, // Time
+      { name: 'line', type: FieldType.string, config: {}, values: lines }, // Line
     ],
-  });
+    length: times.length,
+  };
+}
+
+/**
+ * Transform LokiResponse data and appends it to MutableDataFrame. Used for streaming where the dataFrame can be
+ * a CircularDataFrame creating a fixed size rolling buffer.
+ * TODO: Probably could be unified with the logStreamToDataFrame function.
+ */
+export function appendResponseToBufferedData(response: LokiResponse, data: MutableDataFrame) {
+  // Should we do anythign with: response.dropped_entries?
+
+  const streams: LokiLogsStream[] = response.streams;
+  if (streams && streams.length) {
+    for (const stream of streams) {
+      // Find unique labels
+      const labels = parseLabels(stream.labels);
+      const unique = findUniqueLabels(labels, data.labels);
+
+      // Add each line
+      for (const entry of stream.entries) {
+        data.values.ts.add(entry.ts || entry.timestamp);
+        data.values.line.add(entry.line);
+        data.values.labels.add(unique);
+      }
+    }
+  }
 }

+ 4 - 0
public/app/plugins/datasource/loki/types.ts

@@ -12,6 +12,10 @@ export interface LokiOptions extends DataSourceJsonData {
   maxLines?: string;
 }
 
+export interface LokiResponse {
+  streams: LokiLogsStream[];
+}
+
 export interface LokiLogsStream {
   labels: string;
   entries: LokiLogsStreamEntry[];