Browse Source

Explore: Adds Live option for supported datasources (#17062)

* Wip: Initial commit

* Refactor: Adds support in Loki datasource for streaming

* Refactor: Adds Live option to RefreshInterval

* Refactor: Adds styles to logrows

* Style: Reverses the order of Explore layout on Live

* Refactor: Adds LiveLogs component

* Tests: Adds tests for epics

* Style: Adds animation to Live in RefreshPicker

* Refactor: Adds ElapsedTime and progress line to LiveLogs

* Style: Adds specific colors to each theme

* Refactor: Adds support for Lokis new API

* Fix: Adds null to resulting empty array

* Refactor: Limits the rate of incoming messages from websockets

* Refactor: Throttles messages instead for simplicity

* Refactor: Optimizes row processing performance

* Refactor: Adds stop live button

* Fix: Fixes so that RefreshPicker shows the correct value when called programmatically

* Refactor: Merges with master and removes a console.log

* Refactor: Sorts rows in correct order and fixes minor UI issues

* Refactor: Adds minor improvments to sorting and container size
Hugo Häggmark 6 years ago
parent
commit
db48ec1f08

+ 1 - 0
package.json

@@ -229,6 +229,7 @@
     "react-window": "1.7.1",
     "redux": "4.0.1",
     "redux-logger": "3.0.6",
+    "redux-observable": "1.1.0",
     "redux-thunk": "2.3.0",
     "remarkable": "1.7.1",
     "reselect": "4.0.0",

+ 6 - 0
packages/grafana-ui/src/components/Button/AbstractButton.tsx

@@ -75,6 +75,12 @@ const getButtonStyles = (theme: GrafanaTheme, size: ButtonSize, variant: ButtonV
       iconDistance = theme.spacing.xs;
       height = theme.height.sm;
       break;
+    case ButtonSize.Medium:
+      padding = `${theme.spacing.sm} ${theme.spacing.md}`;
+      fontSize = theme.typography.size.md;
+      iconDistance = theme.spacing.sm;
+      height = theme.height.md;
+      break;
     case ButtonSize.Large:
       padding = `${theme.spacing.md} ${theme.spacing.lg}`;
       fontSize = theme.typography.size.lg;

+ 8 - 1
packages/grafana-ui/src/components/RefreshPicker/RefreshPicker.tsx

@@ -5,7 +5,9 @@ import { Tooltip } from '../Tooltip/Tooltip';
 import { ButtonSelect } from '../Select/ButtonSelect';
 
 export const offOption = { label: 'Off', value: '' };
+export const liveOption = { label: 'Live', value: 'LIVE' };
 export const defaultIntervals = ['5s', '10s', '30s', '1m', '5m', '15m', '30m', '1h', '2h', '1d'];
+export const isLive = (refreshInterval: string): boolean => refreshInterval === liveOption.value;
 
 export interface Props {
   intervals?: string[];
@@ -13,6 +15,7 @@ export interface Props {
   onIntervalChanged: (interval: string) => void;
   value?: string;
   tooltip: string;
+  hasLiveOption?: boolean;
 }
 
 export class RefreshPicker extends PureComponent<Props> {
@@ -36,6 +39,9 @@ export class RefreshPicker extends PureComponent<Props> {
 
   intervalsToOptions = (intervals: string[] = defaultIntervals): Array<SelectOptionItem<string>> => {
     const options = intervals.map(interval => ({ label: interval, value: interval }));
+    if (this.props.hasLiveOption) {
+      options.unshift(liveOption);
+    }
     options.unshift(offOption);
     return options;
   };
@@ -57,6 +63,7 @@ export class RefreshPicker extends PureComponent<Props> {
     const cssClasses = classNames({
       'refresh-picker': true,
       'refresh-picker--off': selectedValue.label === offOption.label,
+      'refresh-picker--live': selectedValue === liveOption,
     });
 
     return (
@@ -68,7 +75,7 @@ export class RefreshPicker extends PureComponent<Props> {
             </button>
           </Tooltip>
           <ButtonSelect
-            className="navbar-button--attached btn--radius-left-0"
+            className="navbar-button--attached btn--radius-left-0$"
             value={selectedValue}
             label={selectedValue.label}
             options={options}

+ 18 - 0
packages/grafana-ui/src/components/RefreshPicker/_RefreshPicker.scss

@@ -30,7 +30,25 @@
     }
   }
 
+  &--live {
+    .select-button-value {
+      animation: liveText 2s infinite;
+    }
+  }
+
   @include media-breakpoint-up(sm) {
     display: block;
   }
 }
+
+@keyframes liveText {
+  0% {
+    color: $orange;
+  }
+  50% {
+    color: $yellow;
+  }
+  100% {
+    color: $orange;
+  }
+}

+ 1 - 1
packages/grafana-ui/src/components/Select/ButtonSelect.tsx

@@ -74,7 +74,7 @@ export class ButtonSelect<T> extends PureComponent<Props<T>> {
         isSearchable={false}
         options={options}
         onChange={this.onChange}
-        defaultValue={value}
+        value={value}
         maxMenuHeight={maxMenuHeight}
         components={combinedComponents}
         className="gf-form-select-box-button-select"

+ 12 - 3
packages/grafana-ui/src/components/SetInterval/SetInterval.tsx

@@ -1,8 +1,10 @@
 import { PureComponent } from 'react';
-import { interval, Subscription, empty, Subject } from 'rxjs';
+import { interval, Subscription, Subject, of, NEVER } from 'rxjs';
 import { tap, switchMap } from 'rxjs/operators';
+import _ from 'lodash';
 
 import { stringToMs } from '../../utils/string';
+import { isLive } from '../RefreshPicker/RefreshPicker';
 
 interface Props {
   func: () => any; // TODO
@@ -24,7 +26,10 @@ export class SetInterval extends PureComponent<Props> {
     this.subscription = this.propsSubject
       .pipe(
         switchMap(props => {
-          return props.loading ? empty() : interval(stringToMs(props.interval));
+          if (isLive(props.interval)) {
+            return of({});
+          }
+          return props.loading ? NEVER : interval(stringToMs(props.interval));
         }),
         tap(() => this.props.func())
       )
@@ -32,7 +37,11 @@ export class SetInterval extends PureComponent<Props> {
     this.propsSubject.next(this.props);
   }
 
-  componentDidUpdate() {
+  componentDidUpdate(prevProps: Props) {
+    if (_.isEqual(prevProps, this.props)) {
+      return;
+    }
+
     this.propsSubject.next(this.props);
   }
 

+ 5 - 0
packages/grafana-ui/src/types/datasource.ts

@@ -84,6 +84,7 @@ export interface DataSourcePluginMeta extends PluginMeta {
   category?: string;
   queryOptions?: PluginMetaQueryOptions;
   sort?: number;
+  supportsStreaming?: boolean;
 }
 
 interface PluginMetaQueryOptions {
@@ -157,6 +158,10 @@ export abstract class DataSourceApi<
    */
   abstract query(options: DataQueryRequest<TQuery>, observer?: DataStreamObserver): Promise<DataQueryResponse>;
 
+  convertToStreamTargets?(options: DataQueryRequest<TQuery>): Array<{ url: string; refId: string }>;
+
+  resultToSeriesData?(data: any, refId: string): SeriesData[];
+
   /**
    * Test & verify datasource settings & connection details
    */

+ 3 - 0
packages/grafana-ui/src/utils/moment_wrapper.ts

@@ -43,6 +43,9 @@ export interface DateTimeLocale {
 
 export interface DateTimeDuration {
   asHours: () => number;
+  hours: () => number;
+  minutes: () => number;
+  seconds: () => number;
 }
 
 export interface DateTime extends Object {

+ 35 - 1
public/app/core/utils/explore.ts

@@ -33,8 +33,9 @@ import {
   QueryOptions,
   ResultGetter,
 } from 'app/types/explore';
-import { LogsDedupStrategy, seriesDataToLogsModel } from 'app/core/logs_model';
+import { LogsDedupStrategy, seriesDataToLogsModel, LogsModel, LogRowModel } from 'app/core/logs_model';
 import { toUtc } from '@grafana/ui/src/utils/moment_wrapper';
+import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
 
 export const DEFAULT_RANGE = {
   from: 'now-6h',
@@ -529,3 +530,36 @@ export const getRefIds = (value: any): string[] => {
 
   return _.uniq(_.flatten(refIds));
 };
+
+const sortInAscendingOrder = (a: LogRowModel, b: LogRowModel) => {
+  if (a.timeEpochMs < b.timeEpochMs) {
+    return -1;
+  }
+
+  if (a.timeEpochMs > b.timeEpochMs) {
+    return 1;
+  }
+
+  return 0;
+};
+
+const sortInDescendingOrder = (a: LogRowModel, b: LogRowModel) => {
+  if (a.timeEpochMs > b.timeEpochMs) {
+    return -1;
+  }
+
+  if (a.timeEpochMs < b.timeEpochMs) {
+    return 1;
+  }
+
+  return 0;
+};
+
+export const sortLogsResult = (logsResult: LogsModel, refreshInterval: string) => {
+  const rows = logsResult ? logsResult.rows : [];
+  const live = isLive(refreshInterval);
+  live ? rows.sort(sortInAscendingOrder) : rows.sort(sortInDescendingOrder);
+  const result: LogsModel = logsResult ? { ...logsResult, rows } : { hasUniqueLabels: false, rows };
+
+  return result;
+};

+ 29 - 4
public/app/features/explore/ElapsedTime.tsx

@@ -1,8 +1,20 @@
 import React, { PureComponent } from 'react';
+import { toDuration } from '@grafana/ui/src/utils/moment_wrapper';
 
 const INTERVAL = 150;
 
-export default class ElapsedTime extends PureComponent<any, any> {
+export interface Props {
+  time?: number;
+  renderCount?: number;
+  className?: string;
+  humanize?: boolean;
+}
+
+export interface State {
+  elapsed: number;
+}
+
+export default class ElapsedTime extends PureComponent<Props, State> {
   offset: number;
   timer: number;
 
@@ -21,12 +33,17 @@ export default class ElapsedTime extends PureComponent<any, any> {
     this.setState({ elapsed });
   };
 
-  componentWillReceiveProps(nextProps) {
+  componentWillReceiveProps(nextProps: Props) {
     if (nextProps.time) {
       clearInterval(this.timer);
     } else if (this.props.time) {
       this.start();
     }
+
+    if (nextProps.renderCount) {
+      clearInterval(this.timer);
+      this.start();
+    }
   }
 
   componentDidMount() {
@@ -39,8 +56,16 @@ export default class ElapsedTime extends PureComponent<any, any> {
 
   render() {
     const { elapsed } = this.state;
-    const { className, time } = this.props;
+    const { className, time, humanize } = this.props;
     const value = (time || elapsed) / 1000;
-    return <span className={`elapsed-time ${className}`}>{value.toFixed(1)}s</span>;
+    let displayValue = `${value.toFixed(1)}s`;
+    if (humanize) {
+      const duration = toDuration(elapsed);
+      const hours = duration.hours();
+      const minutes = duration.minutes();
+      const seconds = duration.seconds();
+      displayValue = hours ? `${hours}h ${minutes}m ${seconds}s` : minutes ? ` ${minutes}m ${seconds}s` : `${seconds}s`;
+    }
+    return <span className={`elapsed-time ${className}`}>{displayValue}</span>;
   }
 }

+ 3 - 0
public/app/features/explore/Explore.tsx

@@ -87,6 +87,7 @@ interface ExploreProps {
   initialUI: ExploreUIState;
   queryErrors: DataQueryError[];
   mode: ExploreMode;
+  isLive: boolean;
 }
 
 /**
@@ -315,6 +316,7 @@ function mapStateToProps(state: StoreState, { exploreId }: ExploreProps) {
     update,
     queryErrors,
     mode,
+    isLive,
   } = item;
 
   const { datasource, queries, range: urlRange, ui } = (urlState || {}) as ExploreUrlState;
@@ -340,6 +342,7 @@ function mapStateToProps(state: StoreState, { exploreId }: ExploreProps) {
     initialUI,
     queryErrors,
     mode,
+    isLive,
   };
 }
 

+ 24 - 6
public/app/features/explore/ExploreToolbar.tsx

@@ -39,15 +39,20 @@ const createResponsiveButton = (options: {
   buttonClassName?: string;
   iconClassName?: string;
   iconSide?: IconSide;
+  disabled?: boolean;
 }) => {
   const defaultOptions = {
     iconSide: IconSide.left,
   };
   const props = { ...options, defaultOptions };
-  const { title, onClick, buttonClassName, iconClassName, splitted, iconSide } = props;
+  const { title, onClick, buttonClassName, iconClassName, splitted, iconSide, disabled } = props;
 
   return (
-    <button className={`btn navbar-button ${buttonClassName ? buttonClassName : ''}`} onClick={onClick}>
+    <button
+      className={`btn navbar-button ${buttonClassName ? buttonClassName : ''}`}
+      onClick={onClick}
+      disabled={disabled || false}
+    >
       {iconClassName && iconSide === IconSide.left ? <i className={`${iconClassName}`} /> : null}
       <span className="btn-title">{!splitted ? title : ''}</span>
       {iconClassName && iconSide === IconSide.right ? <i className={`${iconClassName}`} /> : null}
@@ -72,6 +77,8 @@ interface StateProps {
   refreshInterval: string;
   supportedModeOptions: Array<SelectOptionItem<ExploreMode>>;
   selectedModeOption: SelectOptionItem<ExploreMode>;
+  hasLiveOption: boolean;
+  isLive: boolean;
 }
 
 interface DispatchProps {
@@ -134,6 +141,8 @@ export class UnConnectedExploreToolbar extends PureComponent<Props, {}> {
       split,
       supportedModeOptions,
       selectedModeOption,
+      hasLiveOption,
+      isLive,
     } = this.props;
 
     return (
@@ -199,19 +208,23 @@ export class UnConnectedExploreToolbar extends PureComponent<Props, {}> {
                   onClick: split,
                   iconClassName: 'fa fa-fw fa-columns icon-margin-right',
                   iconSide: IconSide.left,
+                  disabled: isLive,
                 })}
               </div>
             ) : null}
             <div className="explore-toolbar-content-item timepicker">
-              <ClickOutsideWrapper onClick={this.onCloseTimePicker}>
-                <TimePicker ref={timepickerRef} range={range} isUtc={timeZone.isUtc} onChangeTime={onChangeTime} />
-              </ClickOutsideWrapper>
+              {!isLive && (
+                <ClickOutsideWrapper onClick={this.onCloseTimePicker}>
+                  <TimePicker ref={timepickerRef} range={range} isUtc={timeZone.isUtc} onChangeTime={onChangeTime} />
+                </ClickOutsideWrapper>
+              )}
 
               <RefreshPicker
                 onIntervalChanged={this.onChangeRefreshInterval}
                 onRefresh={this.onRunQuery}
                 value={refreshInterval}
                 tooltip="Refresh"
+                hasLiveOption={hasLiveOption}
               />
               {refreshInterval && <SetInterval func={this.onRunQuery} interval={refreshInterval} loading={loading} />}
             </div>
@@ -227,7 +240,8 @@ export class UnConnectedExploreToolbar extends PureComponent<Props, {}> {
                 title: 'Run Query',
                 onClick: this.onRunQuery,
                 buttonClassName: 'navbar-button--secondary',
-                iconClassName: loading ? 'fa fa-spinner fa-fw fa-spin run-icon' : 'fa fa-level-down fa-fw run-icon',
+                iconClassName:
+                  loading && !isLive ? 'fa fa-spinner fa-fw fa-spin run-icon' : 'fa fa-level-down fa-fw run-icon',
                 iconSide: IconSide.right,
               })}
             </div>
@@ -252,11 +266,13 @@ const mapStateToProps = (state: StoreState, { exploreId }: OwnProps): StateProps
     tableIsLoading,
     supportedModes,
     mode,
+    isLive,
   } = exploreItem;
   const selectedDatasource = datasourceInstance
     ? exploreDatasources.find(datasource => datasource.name === datasourceInstance.name)
     : undefined;
   const loading = graphIsLoading || logIsLoading || tableIsLoading;
+  const hasLiveOption = datasourceInstance && datasourceInstance.convertToStreamTargets ? true : false;
 
   const supportedModeOptions: Array<SelectOptionItem<ExploreMode>> = [];
   let selectedModeOption = null;
@@ -296,6 +312,8 @@ const mapStateToProps = (state: StoreState, { exploreId }: OwnProps): StateProps
     refreshInterval,
     supportedModeOptions,
     selectedModeOption,
+    hasLiveOption,
+    isLive,
   };
 };
 

+ 118 - 0
public/app/features/explore/LiveLogs.tsx

@@ -0,0 +1,118 @@
+import React, { PureComponent } from 'react';
+import { css, cx } from 'emotion';
+import { Themeable, withTheme, GrafanaTheme, selectThemeVariant, LinkButton } from '@grafana/ui';
+
+import { LogsModel, LogRowModel } from 'app/core/logs_model';
+import ElapsedTime from './ElapsedTime';
+import { ButtonSize, ButtonVariant } from '@grafana/ui/src/components/Button/AbstractButton';
+
+const getStyles = (theme: GrafanaTheme) => ({
+  logsRowsLive: css`
+    label: logs-rows-live;
+    display: flex;
+    flex-flow: column nowrap;
+    height: 65vh;
+    overflow-y: auto;
+    :first-child {
+      margin-top: auto !important;
+    }
+  `,
+  logsRowFresh: css`
+    label: logs-row-fresh;
+    color: ${theme.colors.text};
+    background-color: ${selectThemeVariant({ light: theme.colors.gray6, dark: theme.colors.gray1 }, theme.type)};
+  `,
+  logsRowOld: css`
+    label: logs-row-old;
+    opacity: 0.8;
+  `,
+  logsRowsIndicator: css`
+    font-size: ${theme.typography.size.md};
+    padding: ${theme.spacing.sm} 0;
+    display: flex;
+    align-items: center;
+  `,
+});
+
+export interface Props extends Themeable {
+  logsResult?: LogsModel;
+  stopLive: () => void;
+}
+
+export interface State {
+  renderCount: number;
+}
+
+class LiveLogs extends PureComponent<Props, State> {
+  private liveEndDiv: HTMLDivElement = null;
+
+  constructor(props: Props) {
+    super(props);
+    this.state = { renderCount: 0 };
+  }
+
+  componentDidUpdate(prevProps: Props) {
+    const prevRows: LogRowModel[] = prevProps.logsResult ? prevProps.logsResult.rows : [];
+    const rows: LogRowModel[] = this.props.logsResult ? this.props.logsResult.rows : [];
+
+    if (prevRows !== rows) {
+      this.setState({
+        renderCount: this.state.renderCount + 1,
+      });
+    }
+
+    if (this.liveEndDiv) {
+      this.liveEndDiv.scrollIntoView(false);
+    }
+  }
+
+  render() {
+    const { theme } = this.props;
+    const { renderCount } = this.state;
+    const styles = getStyles(theme);
+    const rowsToRender: LogRowModel[] = this.props.logsResult ? this.props.logsResult.rows : [];
+
+    return (
+      <>
+        <div className={cx(['logs-rows', styles.logsRowsLive])}>
+          {rowsToRender.map((row: any, index) => {
+            return (
+              <div
+                className={row.fresh ? cx(['logs-row', styles.logsRowFresh]) : cx(['logs-row', styles.logsRowOld])}
+                key={`${row.timeEpochMs}-${index}`}
+              >
+                <div className="logs-row__localtime" title={`${row.timestamp} (${row.timeFromNow})`}>
+                  {row.timeLocal}
+                </div>
+                <div className="logs-row__message">{row.entry}</div>
+              </div>
+            );
+          })}
+          <div
+            ref={element => {
+              this.liveEndDiv = element;
+              if (this.liveEndDiv) {
+                this.liveEndDiv.scrollIntoView(false);
+              }
+            }}
+          />
+        </div>
+        <div className={cx([styles.logsRowsIndicator])}>
+          <span>
+            Last line received: <ElapsedTime renderCount={renderCount} humanize={true} /> ago
+          </span>
+          <LinkButton
+            onClick={this.props.stopLive}
+            size={ButtonSize.Medium}
+            variant={ButtonVariant.Transparent}
+            style={{ color: theme.colors.orange }}
+          >
+            Stop Live
+          </LinkButton>
+        </div>
+      </>
+    );
+  }
+}
+
+export const LiveLogsWithTheme = withTheme(LiveLogs);

+ 0 - 1
public/app/features/explore/LogRow.tsx

@@ -228,7 +228,6 @@ export class LogRow extends PureComponent<Props, State> {
           const styles = this.state.showContext
             ? cx(logRowStyles, getLogRowWithContextStyles(theme, this.state).row)
             : logRowStyles;
-          console.log(styles);
           return (
             <div className={`logs-row ${this.props.className}`}>
               {showDuplicates && (

+ 31 - 3
public/app/features/explore/LogsContainer.tsx

@@ -19,9 +19,11 @@ import { StoreState } from 'app/types';
 import { changeDedupStrategy, changeTime } from './state/actions';
 import Logs from './Logs';
 import Panel from './Panel';
-import { toggleLogLevelAction } from 'app/features/explore/state/actionTypes';
+import { toggleLogLevelAction, changeRefreshIntervalAction } from 'app/features/explore/state/actionTypes';
 import { deduplicatedLogsSelector, exploreItemUIStateSelector } from 'app/features/explore/state/selectors';
 import { getTimeZone } from '../profile/state/selectors';
+import { LiveLogsWithTheme } from './LiveLogs';
+import { offOption } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
 
 interface LogsContainerProps {
   datasourceInstance: DataSourceApi | null;
@@ -43,6 +45,8 @@ interface LogsContainerProps {
   hiddenLogLevels: Set<LogLevel>;
   width: number;
   changeTime: typeof changeTime;
+  isLive: boolean;
+  stopLive: typeof changeRefreshIntervalAction;
 }
 
 export class LogsContainer extends PureComponent<LogsContainerProps> {
@@ -56,6 +60,11 @@ export class LogsContainer extends PureComponent<LogsContainerProps> {
     changeTime(exploreId, range);
   };
 
+  onStopLive = () => {
+    const { exploreId } = this.props;
+    this.props.stopLive({ exploreId, refreshInterval: offOption.value });
+  };
+
   handleDedupStrategyChange = (dedupStrategy: LogsDedupStrategy) => {
     this.props.changeDedupStrategy(this.props.exploreId, dedupStrategy);
   };
@@ -81,7 +90,6 @@ export class LogsContainer extends PureComponent<LogsContainerProps> {
   render() {
     const {
       exploreId,
-
       loading,
       logsHighlighterExpressions,
       logsResult,
@@ -95,8 +103,17 @@ export class LogsContainer extends PureComponent<LogsContainerProps> {
       scanRange,
       width,
       hiddenLogLevels,
+      isLive,
     } = this.props;
 
+    if (isLive) {
+      return (
+        <Panel label="Logs" loading={false} isOpen>
+          <LiveLogsWithTheme logsResult={logsResult} stopLive={this.onStopLive} />
+        </Panel>
+      );
+    }
+
     return (
       <Panel label="Logs" loading={loading} isOpen>
         <Logs
@@ -128,7 +145,16 @@ export class LogsContainer extends PureComponent<LogsContainerProps> {
 function mapStateToProps(state: StoreState, { exploreId }) {
   const explore = state.explore;
   const item: ExploreItemState = explore[exploreId];
-  const { logsHighlighterExpressions, logsResult, logIsLoading, scanning, scanRange, range, datasourceInstance } = item;
+  const {
+    logsHighlighterExpressions,
+    logsResult,
+    logIsLoading,
+    scanning,
+    scanRange,
+    range,
+    datasourceInstance,
+    isLive,
+  } = item;
   const loading = logIsLoading;
   const { dedupStrategy } = exploreItemUIStateSelector(item);
   const hiddenLogLevels = new Set(item.hiddenLogLevels);
@@ -147,6 +173,7 @@ function mapStateToProps(state: StoreState, { exploreId }) {
     hiddenLogLevels,
     dedupedResult,
     datasourceInstance,
+    isLive,
   };
 }
 
@@ -154,6 +181,7 @@ const mapDispatchToProps = {
   changeDedupStrategy,
   toggleLogLevelAction,
   changeTime,
+  stopLive: changeRefreshIntervalAction,
 };
 
 export default hot(module)(

+ 11 - 0
public/app/features/explore/state/actions.ts

@@ -91,6 +91,7 @@ import { LogsDedupStrategy } from 'app/core/logs_model';
 import { getTimeZone } from 'app/features/profile/state/selectors';
 import { isDateTime } from '@grafana/ui/src/utils/moment_wrapper';
 import { toDataQueryError } from 'app/features/dashboard/state/PanelQueryState';
+import { startSubscriptionsAction, subscriptionDataReceivedAction } from 'app/features/explore/state/epics';
 
 /**
  * Updates UI state and save it to the URL
@@ -583,6 +584,16 @@ function runQueriesForType(
     const { datasourceInstance, eventBridge, queries, queryIntervals, range, scanning, history } = getState().explore[
       exploreId
     ];
+
+    if (resultType === 'Logs' && datasourceInstance.convertToStreamTargets) {
+      dispatch(
+        startSubscriptionsAction({
+          exploreId,
+          dataReceivedActionCreator: subscriptionDataReceivedAction,
+        })
+      );
+    }
+
     const datasourceId = datasourceInstance.meta.id;
     const transaction = buildQueryTransaction(queries, resultType, queryOptions, range, queryIntervals, scanning);
     dispatch(queryStartAction({ exploreId, resultType, rowIndex: 0, transaction }));

+ 550 - 0
public/app/features/explore/state/epics.test.ts

@@ -0,0 +1,550 @@
+import { liveOption } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
+import { DataSourceApi, DataQuery } from '@grafana/ui/src/types/datasource';
+
+import { ExploreId, ExploreState } from 'app/types';
+import { actionCreatorFactory } from 'app/core/redux/actionCreatorFactory';
+import {
+  startSubscriptionsEpic,
+  startSubscriptionsAction,
+  SubscriptionDataReceivedPayload,
+  startSubscriptionAction,
+  startSubscriptionEpic,
+  limitMessageRatePayloadAction,
+} from './epics';
+import { makeExploreItemState } from './reducers';
+import { epicTester } from 'test/core/redux/epicTester';
+import {
+  resetExploreAction,
+  updateDatasourceInstanceAction,
+  changeRefreshIntervalAction,
+  clearQueriesAction,
+} from './actionTypes';
+
+const setup = (options: any = {}) => {
+  const url = '/api/datasources/proxy/20/api/prom/tail?query=%7Bfilename%3D%22%2Fvar%2Flog%2Fdocker.log%22%7D';
+  const webSocketUrl = 'ws://localhost' + url;
+  const refId = options.refId || 'A';
+  const exploreId = ExploreId.left;
+  const datasourceInstance: DataSourceApi = options.datasourceInstance || {
+    id: 1337,
+    query: jest.fn(),
+    name: 'test',
+    testDatasource: jest.fn(),
+    convertToStreamTargets: () => [
+      {
+        url,
+        refId,
+      },
+    ],
+    resultToSeriesData: data => [data],
+  };
+  const itemState = makeExploreItemState();
+  const explore: Partial<ExploreState> = {
+    [exploreId]: {
+      ...itemState,
+      datasourceInstance,
+      refreshInterval: options.refreshInterval || liveOption.value,
+      queries: [{} as DataQuery],
+    },
+  };
+  const state: any = {
+    explore,
+  };
+
+  return { url, state, refId, webSocketUrl, exploreId };
+};
+
+const dataReceivedActionCreator = actionCreatorFactory<SubscriptionDataReceivedPayload>('test').create();
+
+describe('startSubscriptionsEpic', () => {
+  describe('when startSubscriptionsAction is dispatched', () => {
+    describe('and datasource supports convertToStreamTargets', () => {
+      describe('and explore is Live', () => {
+        it('then correct actions should be dispatched', () => {
+          const { state, refId, webSocketUrl, exploreId } = setup();
+
+          epicTester(startSubscriptionsEpic, state)
+            .whenActionIsDispatched(startSubscriptionsAction({ exploreId, dataReceivedActionCreator }))
+            .thenResultingActionsEqual(
+              startSubscriptionAction({
+                exploreId,
+                refId,
+                url: webSocketUrl,
+                dataReceivedActionCreator,
+              })
+            );
+        });
+      });
+
+      describe('and explore is not Live', () => {
+        it('then no actions should be dispatched', () => {
+          const { state, exploreId } = setup({ refreshInterval: '10s' });
+
+          epicTester(startSubscriptionsEpic, state)
+            .whenActionIsDispatched(startSubscriptionsAction({ exploreId, dataReceivedActionCreator }))
+            .thenNoActionsWhereDispatched();
+        });
+      });
+    });
+
+    describe('and datasource does not support streaming', () => {
+      it('then no actions should be dispatched', () => {
+        const { state, exploreId } = setup({ datasourceInstance: {} });
+
+        epicTester(startSubscriptionsEpic, state)
+          .whenActionIsDispatched(startSubscriptionsAction({ exploreId, dataReceivedActionCreator }))
+          .thenNoActionsWhereDispatched();
+      });
+    });
+  });
+});
+
+describe('startSubscriptionEpic', () => {
+  describe('when startSubscriptionAction is dispatched', () => {
+    describe('and datasource supports resultToSeriesData', () => {
+      it('then correct actions should be dispatched', () => {
+        const { state, webSocketUrl, refId, exploreId } = setup();
+
+        epicTester(startSubscriptionEpic, state)
+          .whenActionIsDispatched(
+            startSubscriptionAction({ url: webSocketUrl, refId, exploreId, dataReceivedActionCreator })
+          )
+          .thenNoActionsWhereDispatched()
+          .whenWebSocketReceivesData({ data: [1, 2, 3] })
+          .thenResultingActionsEqual(
+            limitMessageRatePayloadAction({
+              exploreId,
+              data: { data: [1, 2, 3] } as any,
+              dataReceivedActionCreator,
+            })
+          )
+          .whenWebSocketReceivesData({ data: [4, 5, 6] })
+          .thenResultingActionsEqual(
+            limitMessageRatePayloadAction({
+              exploreId,
+              data: { data: [1, 2, 3] } as any,
+              dataReceivedActionCreator,
+            }),
+            limitMessageRatePayloadAction({
+              exploreId,
+              data: { data: [4, 5, 6] } as any,
+              dataReceivedActionCreator,
+            })
+          );
+      });
+    });
+
+    describe('and datasource does not support resultToSeriesData', () => {
+      it('then no actions should be dispatched', () => {
+        const { state, webSocketUrl, refId, exploreId } = setup({ datasourceInstance: {} });
+
+        epicTester(startSubscriptionEpic, state)
+          .whenActionIsDispatched(
+            startSubscriptionAction({ url: webSocketUrl, refId, exploreId, dataReceivedActionCreator })
+          )
+          .thenNoActionsWhereDispatched()
+          .whenWebSocketReceivesData({ data: [1, 2, 3] })
+          .thenNoActionsWhereDispatched();
+      });
+    });
+  });
+
+  describe('when an subscription is active', () => {
+    describe('and resetExploreAction is dispatched', () => {
+      it('then subscription should be unsubscribed', () => {
+        const { state, webSocketUrl, refId, exploreId } = setup();
+
+        epicTester(startSubscriptionEpic, state)
+          .whenActionIsDispatched(
+            startSubscriptionAction({ url: webSocketUrl, refId, exploreId, dataReceivedActionCreator })
+          )
+          .thenNoActionsWhereDispatched()
+          .whenWebSocketReceivesData({ data: [1, 2, 3] })
+          .thenResultingActionsEqual(
+            limitMessageRatePayloadAction({
+              exploreId,
+              data: { data: [1, 2, 3] } as any,
+              dataReceivedActionCreator,
+            })
+          )
+          .whenActionIsDispatched(resetExploreAction())
+          .whenWebSocketReceivesData({ data: [4, 5, 6] })
+          .thenResultingActionsEqual(
+            limitMessageRatePayloadAction({
+              exploreId,
+              data: { data: [1, 2, 3] } as any,
+              dataReceivedActionCreator,
+            })
+          );
+      });
+    });
+
+    describe('and updateDatasourceInstanceAction is dispatched', () => {
+      describe('and exploreId matches the websockets', () => {
+        it('then subscription should be unsubscribed', () => {
+          const { state, webSocketUrl, refId, exploreId } = setup();
+
+          epicTester(startSubscriptionEpic, state)
+            .whenActionIsDispatched(
+              startSubscriptionAction({
+                url: webSocketUrl,
+                refId,
+                exploreId,
+                dataReceivedActionCreator,
+              })
+            )
+            .thenNoActionsWhereDispatched()
+            .whenWebSocketReceivesData({ data: [1, 2, 3] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            )
+            .whenActionIsDispatched(updateDatasourceInstanceAction({ exploreId, datasourceInstance: null }))
+            .whenWebSocketReceivesData({ data: [4, 5, 6] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            );
+        });
+      });
+
+      describe('and exploreId does not match the websockets', () => {
+        it('then subscription should not be unsubscribed', () => {
+          const { state, webSocketUrl, refId, exploreId } = setup();
+
+          epicTester(startSubscriptionEpic, state)
+            .whenActionIsDispatched(
+              startSubscriptionAction({
+                url: webSocketUrl,
+                refId,
+                exploreId,
+                dataReceivedActionCreator,
+              })
+            )
+            .thenNoActionsWhereDispatched()
+            .whenWebSocketReceivesData({ data: [1, 2, 3] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            )
+            .whenActionIsDispatched(
+              updateDatasourceInstanceAction({ exploreId: ExploreId.right, datasourceInstance: null })
+            )
+            .whenWebSocketReceivesData({ data: [4, 5, 6] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              }),
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [4, 5, 6] } as any,
+                dataReceivedActionCreator,
+              })
+            );
+        });
+      });
+    });
+
+    describe('and changeRefreshIntervalAction is dispatched', () => {
+      describe('and exploreId matches the websockets', () => {
+        describe('and refreshinterval is not "Live"', () => {
+          it('then subscription should be unsubscribed', () => {
+            const { state, webSocketUrl, refId, exploreId } = setup();
+
+            epicTester(startSubscriptionEpic, state)
+              .whenActionIsDispatched(
+                startSubscriptionAction({
+                  url: webSocketUrl,
+                  refId,
+                  exploreId,
+                  dataReceivedActionCreator,
+                })
+              )
+              .thenNoActionsWhereDispatched()
+              .whenWebSocketReceivesData({ data: [1, 2, 3] })
+              .thenResultingActionsEqual(
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [1, 2, 3] } as any,
+                  dataReceivedActionCreator,
+                })
+              )
+              .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId, refreshInterval: '10s' }))
+              .whenWebSocketReceivesData({ data: [4, 5, 6] })
+              .thenResultingActionsEqual(
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [1, 2, 3] } as any,
+                  dataReceivedActionCreator,
+                })
+              );
+          });
+        });
+
+        describe('and refreshinterval is "Live"', () => {
+          it('then subscription should not be unsubscribed', () => {
+            const { state, webSocketUrl, refId, exploreId } = setup();
+
+            epicTester(startSubscriptionEpic, state)
+              .whenActionIsDispatched(
+                startSubscriptionAction({
+                  url: webSocketUrl,
+                  refId,
+                  exploreId,
+                  dataReceivedActionCreator,
+                })
+              )
+              .thenNoActionsWhereDispatched()
+              .whenWebSocketReceivesData({ data: [1, 2, 3] })
+              .thenResultingActionsEqual(
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [1, 2, 3] } as any,
+                  dataReceivedActionCreator,
+                })
+              )
+              .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId, refreshInterval: liveOption.value }))
+              .whenWebSocketReceivesData({ data: [4, 5, 6] })
+              .thenResultingActionsEqual(
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [1, 2, 3] } as any,
+                  dataReceivedActionCreator,
+                }),
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [4, 5, 6] } as any,
+                  dataReceivedActionCreator,
+                })
+              );
+          });
+        });
+      });
+
+      describe('and exploreId does not match the websockets', () => {
+        it('then subscription should not be unsubscribed', () => {
+          const { state, webSocketUrl, refId, exploreId } = setup();
+
+          epicTester(startSubscriptionEpic, state)
+            .whenActionIsDispatched(
+              startSubscriptionAction({
+                url: webSocketUrl,
+                refId,
+                exploreId,
+                dataReceivedActionCreator,
+              })
+            )
+            .thenNoActionsWhereDispatched()
+            .whenWebSocketReceivesData({ data: [1, 2, 3] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            )
+            .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId: ExploreId.right, refreshInterval: '10s' }))
+            .whenWebSocketReceivesData({ data: [4, 5, 6] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              }),
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [4, 5, 6] } as any,
+                dataReceivedActionCreator,
+              })
+            );
+        });
+      });
+    });
+
+    describe('and clearQueriesAction is dispatched', () => {
+      describe('and exploreId matches the websockets', () => {
+        it('then subscription should be unsubscribed', () => {
+          const { state, webSocketUrl, refId, exploreId } = setup();
+
+          epicTester(startSubscriptionEpic, state)
+            .whenActionIsDispatched(
+              startSubscriptionAction({
+                url: webSocketUrl,
+                refId,
+                exploreId,
+                dataReceivedActionCreator,
+              })
+            )
+            .thenNoActionsWhereDispatched()
+            .whenWebSocketReceivesData({ data: [1, 2, 3] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            )
+            .whenActionIsDispatched(clearQueriesAction({ exploreId }))
+            .whenWebSocketReceivesData({ data: [4, 5, 6] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            );
+        });
+      });
+
+      describe('and exploreId does not match the websockets', () => {
+        it('then subscription should not be unsubscribed', () => {
+          const { state, webSocketUrl, refId, exploreId } = setup();
+
+          epicTester(startSubscriptionEpic, state)
+            .whenActionIsDispatched(
+              startSubscriptionAction({
+                url: webSocketUrl,
+                refId,
+                exploreId,
+                dataReceivedActionCreator,
+              })
+            )
+            .thenNoActionsWhereDispatched()
+            .whenWebSocketReceivesData({ data: [1, 2, 3] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            )
+            .whenActionIsDispatched(clearQueriesAction({ exploreId: ExploreId.right }))
+            .whenWebSocketReceivesData({ data: [4, 5, 6] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              }),
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [4, 5, 6] } as any,
+                dataReceivedActionCreator,
+              })
+            );
+        });
+      });
+    });
+
+    describe('and startSubscriptionAction is dispatched', () => {
+      describe('and exploreId and refId matches the websockets', () => {
+        it('then subscription should be unsubscribed', () => {
+          const { state, webSocketUrl, refId, exploreId } = setup();
+
+          epicTester(startSubscriptionEpic, state)
+            .whenActionIsDispatched(
+              startSubscriptionAction({
+                url: webSocketUrl,
+                refId,
+                exploreId,
+                dataReceivedActionCreator,
+              })
+            )
+            .thenNoActionsWhereDispatched()
+            .whenWebSocketReceivesData({ data: [1, 2, 3] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              })
+            )
+            .whenActionIsDispatched(
+              startSubscriptionAction({
+                url: webSocketUrl,
+                refId,
+                exploreId,
+                dataReceivedActionCreator,
+              })
+            )
+            .whenWebSocketReceivesData({ data: [4, 5, 6] })
+            .thenResultingActionsEqual(
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [1, 2, 3] } as any,
+                dataReceivedActionCreator,
+              }),
+              limitMessageRatePayloadAction({
+                exploreId,
+                data: { data: [4, 5, 6] } as any,
+                dataReceivedActionCreator,
+              })
+              // This looks like we haven't stopped the subscription but we actually started the same again
+            );
+        });
+
+        describe('and exploreId or refId does not match the websockets', () => {
+          it('then subscription should not be unsubscribed and another websocket is started', () => {
+            const { state, webSocketUrl, refId, exploreId } = setup();
+
+            epicTester(startSubscriptionEpic, state)
+              .whenActionIsDispatched(
+                startSubscriptionAction({
+                  url: webSocketUrl,
+                  refId,
+                  exploreId,
+                  dataReceivedActionCreator,
+                })
+              )
+              .thenNoActionsWhereDispatched()
+              .whenWebSocketReceivesData({ data: [1, 2, 3] })
+              .thenResultingActionsEqual(
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [1, 2, 3] } as any,
+                  dataReceivedActionCreator,
+                })
+              )
+              .whenActionIsDispatched(
+                startSubscriptionAction({
+                  url: webSocketUrl,
+                  refId: 'B',
+                  exploreId,
+                  dataReceivedActionCreator,
+                })
+              )
+              .whenWebSocketReceivesData({ data: [4, 5, 6] })
+              .thenResultingActionsEqual(
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [1, 2, 3] } as any,
+                  dataReceivedActionCreator,
+                }),
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [4, 5, 6] } as any,
+                  dataReceivedActionCreator,
+                }),
+                limitMessageRatePayloadAction({
+                  exploreId,
+                  data: { data: [4, 5, 6] } as any,
+                  dataReceivedActionCreator,
+                })
+              );
+          });
+        });
+      });
+    });
+  });
+});

+ 159 - 0
public/app/features/explore/state/epics.ts

@@ -0,0 +1,159 @@
+import { Epic } from 'redux-observable';
+import { NEVER } from 'rxjs';
+import { takeUntil, mergeMap, tap, filter, map, throttleTime } from 'rxjs/operators';
+
+import { StoreState, ExploreId } from 'app/types';
+import { ActionOf, ActionCreator, actionCreatorFactory } from '../../../core/redux/actionCreatorFactory';
+import { config } from '../../../core/config';
+import {
+  updateDatasourceInstanceAction,
+  resetExploreAction,
+  changeRefreshIntervalAction,
+  clearQueriesAction,
+} from './actionTypes';
+import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
+import { SeriesData } from '@grafana/ui/src/types/data';
+import { EpicDependencies } from 'app/store/configureStore';
+
+const convertToWebSocketUrl = (url: string) => {
+  const protocol = window.location.protocol === 'https' ? 'wss://' : 'ws://';
+  let backend = `${protocol}${window.location.host}${config.appSubUrl}`;
+  if (backend.endsWith('/')) {
+    backend = backend.slice(0, backend.length - 1);
+  }
+  return `${backend}${url}`;
+};
+
+export interface StartSubscriptionsPayload {
+  exploreId: ExploreId;
+  dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
+}
+
+export const startSubscriptionsAction = actionCreatorFactory<StartSubscriptionsPayload>(
+  'explore/START_SUBSCRIPTIONS'
+).create();
+
+export interface StartSubscriptionPayload {
+  url: string;
+  refId: string;
+  exploreId: ExploreId;
+  dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
+}
+
+export const startSubscriptionAction = actionCreatorFactory<StartSubscriptionPayload>(
+  'explore/START_SUBSCRIPTION'
+).create();
+
+export interface SubscriptionDataReceivedPayload {
+  data: SeriesData;
+  exploreId: ExploreId;
+}
+
+export const subscriptionDataReceivedAction = actionCreatorFactory<SubscriptionDataReceivedPayload>(
+  'explore/SUBSCRIPTION_DATA_RECEIVED'
+).create();
+
+export interface LimitMessageRatePayload {
+  data: SeriesData;
+  exploreId: ExploreId;
+  dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
+}
+
+export const limitMessageRatePayloadAction = actionCreatorFactory<LimitMessageRatePayload>(
+  'explore/LIMIT_MESSAGE_RATE_PAYLOAD'
+).create();
+
+export const startSubscriptionsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
+  return action$.ofType(startSubscriptionsAction.type).pipe(
+    mergeMap((action: ActionOf<StartSubscriptionsPayload>) => {
+      const { exploreId, dataReceivedActionCreator } = action.payload;
+      const { datasourceInstance, queries, refreshInterval } = state$.value.explore[exploreId];
+
+      if (!datasourceInstance || !datasourceInstance.convertToStreamTargets) {
+        return NEVER; //do nothing if datasource does not support streaming
+      }
+
+      if (!refreshInterval || !isLive(refreshInterval)) {
+        return NEVER; //do nothing if refresh interval is not 'LIVE'
+      }
+
+      const request: any = { targets: queries };
+      return datasourceInstance.convertToStreamTargets(request).map(target =>
+        startSubscriptionAction({
+          url: convertToWebSocketUrl(target.url),
+          refId: target.refId,
+          exploreId,
+          dataReceivedActionCreator,
+        })
+      );
+    })
+  );
+};
+
+export const startSubscriptionEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = (
+  action$,
+  state$,
+  { getWebSocket }
+) => {
+  return action$.ofType(startSubscriptionAction.type).pipe(
+    mergeMap((action: ActionOf<StartSubscriptionPayload>) => {
+      const { url, exploreId, refId, dataReceivedActionCreator } = action.payload;
+      return getWebSocket(url).pipe(
+        takeUntil(
+          action$
+            .ofType(
+              startSubscriptionAction.type,
+              resetExploreAction.type,
+              updateDatasourceInstanceAction.type,
+              changeRefreshIntervalAction.type,
+              clearQueriesAction.type
+            )
+            .pipe(
+              filter(action => {
+                if (action.type === resetExploreAction.type) {
+                  return true; // stops all subscriptions if user navigates away
+                }
+
+                if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
+                  return true; // stops subscriptions if user changes data source
+                }
+
+                if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
+                  return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
+                }
+
+                if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
+                  return true; // stops subscriptions if user clears all queries
+                }
+
+                return action.payload.exploreId === exploreId && action.payload.refId === refId;
+              }),
+              tap(value => console.log('Stopping subscription', value))
+            )
+        ),
+        mergeMap((result: any) => {
+          const { datasourceInstance } = state$.value.explore[exploreId];
+
+          if (!datasourceInstance || !datasourceInstance.resultToSeriesData) {
+            return [null]; //do nothing if datasource does not support streaming
+          }
+
+          return datasourceInstance
+            .resultToSeriesData(result, refId)
+            .map(data => limitMessageRatePayloadAction({ exploreId, data, dataReceivedActionCreator }));
+        }),
+        filter(action => action !== null)
+      );
+    })
+  );
+};
+
+export const limitMessageRateEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = action$ => {
+  return action$.ofType(limitMessageRatePayloadAction.type).pipe(
+    throttleTime(1),
+    map((action: ActionOf<LimitMessageRatePayload>) => {
+      const { exploreId, data, dataReceivedActionCreator } = action.payload;
+      return dataReceivedActionCreator({ exploreId, data });
+    })
+  );
+};

+ 76 - 5
public/app/features/explore/state/reducers.ts

@@ -7,6 +7,7 @@ import {
   parseUrlState,
   DEFAULT_UI_STATE,
   generateNewKeyAndAddRefIdIfMissing,
+  sortLogsResult,
 } from 'app/core/utils/explore';
 import { ExploreItemState, ExploreState, ExploreId, ExploreUpdateState, ExploreMode } from 'app/types/explore';
 import { DataQuery } from '@grafana/ui/src/types';
@@ -55,6 +56,9 @@ import {
 import { updateLocation } from 'app/core/actions/location';
 import { LocationUpdate } from 'app/types';
 import TableModel from 'app/core/table_model';
+import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
+import { subscriptionDataReceivedAction, startSubscriptionAction } from './epics';
+import { LogsModel, seriesDataToLogsModel } from 'app/core/logs_model';
 
 export const DEFAULT_RANGE = {
   from: 'now-6h',
@@ -109,6 +113,7 @@ export const makeExploreItemState = (): ExploreItemState => ({
   latency: 0,
   supportedModes: [],
   mode: null,
+  isLive: false,
 });
 
 /**
@@ -184,9 +189,17 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
     filter: changeRefreshIntervalAction,
     mapper: (state, action): ExploreItemState => {
       const { refreshInterval } = action.payload;
+      const live = isLive(refreshInterval);
+      const logsResult = sortLogsResult(state.logsResult, refreshInterval);
+
       return {
         ...state,
         refreshInterval: refreshInterval,
+        graphIsLoading: live ? true : false,
+        tableIsLoading: live ? true : false,
+        logIsLoading: live ? true : false,
+        isLive: live,
+        logsResult,
       };
     },
   })
@@ -376,22 +389,80 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
   .addMapper({
     filter: querySuccessAction,
     mapper: (state, action): ExploreItemState => {
-      const { queryIntervals } = state;
+      const { queryIntervals, refreshInterval } = state;
       const { result, resultType, latency } = action.payload;
       const results = calculateResultsFromQueryTransactions(result, resultType, queryIntervals.intervalMs);
+      const live = isLive(refreshInterval);
+
+      if (live) {
+        return state;
+      }
+
       return {
         ...state,
         graphResult: resultType === 'Graph' ? results.graphResult : state.graphResult,
         tableResult: resultType === 'Table' ? results.tableResult : state.tableResult,
-        logsResult: resultType === 'Logs' ? results.logsResult : state.logsResult,
+        logsResult:
+          resultType === 'Logs'
+            ? sortLogsResult(results.logsResult, refreshInterval)
+            : sortLogsResult(state.logsResult, refreshInterval),
         latency,
-        graphIsLoading: false,
-        logIsLoading: false,
-        tableIsLoading: false,
+        graphIsLoading: live ? true : false,
+        logIsLoading: live ? true : false,
+        tableIsLoading: live ? true : false,
+        showingStartPage: false,
+        update: makeInitialUpdateState(),
+      };
+    },
+  })
+  .addMapper({
+    filter: startSubscriptionAction,
+    mapper: (state): ExploreItemState => {
+      const logsResult = sortLogsResult(state.logsResult, state.refreshInterval);
+
+      return {
+        ...state,
+        logsResult,
+        graphIsLoading: true,
+        logIsLoading: true,
+        tableIsLoading: true,
+        showingStartPage: false,
         update: makeInitialUpdateState(),
       };
     },
   })
+  .addMapper({
+    filter: subscriptionDataReceivedAction,
+    mapper: (state, action): ExploreItemState => {
+      const { queryIntervals, refreshInterval } = state;
+      const { data } = action.payload;
+      const live = isLive(refreshInterval);
+
+      if (live) {
+        return state;
+      }
+
+      const newResults = seriesDataToLogsModel([data], queryIntervals.intervalMs);
+      const rowsInState = sortLogsResult(state.logsResult, state.refreshInterval).rows;
+
+      const processedRows = [];
+      for (const row of rowsInState) {
+        processedRows.push({ ...row, fresh: false });
+      }
+      for (const row of newResults.rows) {
+        processedRows.push({ ...row, fresh: true });
+      }
+
+      const rows = processedRows.slice(processedRows.length - 1000, 1000);
+
+      const logsResult: LogsModel = state.logsResult ? { ...state.logsResult, rows } : { hasUniqueLabels: false, rows };
+
+      return {
+        ...state,
+        logsResult,
+      };
+    },
+  })
   .addMapper({
     filter: removeQueryRowAction,
     mapper: (state, action): ExploreItemState => {

+ 36 - 0
public/app/plugins/datasource/loki/datasource.ts

@@ -68,6 +68,42 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
     return this.backendSrv.datasourceRequest(req);
   }
 
+  convertToStreamTargets = (options: DataQueryRequest<LokiQuery>): Array<{ url: string; refId: string }> => {
+    return options.targets
+      .filter(target => target.expr && !target.hide)
+      .map(target => {
+        const interpolated = this.templateSrv.replace(target.expr);
+        const { query, regexp } = parseQuery(interpolated);
+        const refId = target.refId;
+        const baseUrl = this.instanceSettings.url;
+        const params = serializeParams({ query, regexp });
+        const url = `${baseUrl}/api/prom/tail?${params}`;
+
+        return {
+          url,
+          refId,
+        };
+      });
+  };
+
+  resultToSeriesData = (data: any, refId: string): SeriesData[] => {
+    const toSeriesData = (stream: any, refId: string) => ({
+      ...logStreamToSeriesData(stream),
+      refId,
+    });
+
+    if (data.streams) {
+      // new Loki API purposed in https://github.com/grafana/loki/pull/590
+      const series: SeriesData[] = [];
+      for (const stream of data.streams || []) {
+        series.push(toSeriesData(stream, refId));
+      }
+      return series;
+    }
+
+    return [toSeriesData(data, refId)];
+  };
+
   prepareQueryTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
     const interpolated = this.templateSrv.replace(target.expr);
     const { query, regexp } = parseQuery(interpolated);

+ 19 - 2
public/app/store/configureStore.ts

@@ -1,5 +1,6 @@
 import { createStore, applyMiddleware, compose, combineReducers } from 'redux';
 import thunk from 'redux-thunk';
+import { combineEpics, createEpicMiddleware } from 'redux-observable';
 // import { createLogger } from 'redux-logger';
 import sharedReducers from 'app/core/reducers';
 import alertingReducers from 'app/features/alerting/state/reducers';
@@ -14,6 +15,8 @@ import usersReducers from 'app/features/users/state/reducers';
 import userReducers from 'app/features/profile/state/reducers';
 import organizationReducers from 'app/features/org/state/reducers';
 import { setStore } from './store';
+import { startSubscriptionsEpic, startSubscriptionEpic, limitMessageRateEpic } from 'app/features/explore/state/epics';
+import { WebSocketSubject, webSocket } from 'rxjs/webSocket';
 
 const rootReducers = {
   ...sharedReducers,
@@ -34,6 +37,18 @@ export function addRootReducer(reducers) {
   Object.assign(rootReducers, ...reducers);
 }
 
+export const rootEpic: any = combineEpics(startSubscriptionsEpic, startSubscriptionEpic, limitMessageRateEpic);
+
+export interface EpicDependencies {
+  getWebSocket: <T>(urlConfigOrSource: string) => WebSocketSubject<T>;
+}
+
+const dependencies: EpicDependencies = {
+  getWebSocket: webSocket,
+};
+
+const epicMiddleware = createEpicMiddleware({ dependencies });
+
 export function configureStore() {
   const composeEnhancers = (window as any).__REDUX_DEVTOOLS_EXTENSION_COMPOSE__ || compose;
 
@@ -41,8 +56,10 @@ export function configureStore() {
 
   if (process.env.NODE_ENV !== 'production') {
     // DEV builds we had the logger middleware
-    setStore(createStore(rootReducer, {}, composeEnhancers(applyMiddleware(thunk))));
+    setStore(createStore(rootReducer, {}, composeEnhancers(applyMiddleware(thunk, epicMiddleware))));
   } else {
-    setStore(createStore(rootReducer, {}, composeEnhancers(applyMiddleware(thunk))));
+    setStore(createStore(rootReducer, {}, composeEnhancers(applyMiddleware(thunk, epicMiddleware))));
   }
+
+  epicMiddleware.run(rootEpic);
 }

+ 3 - 0
public/app/types/explore.ts

@@ -259,9 +259,12 @@ export interface ExploreItemState {
   update: ExploreUpdateState;
 
   queryErrors: DataQueryError[];
+
   latency: number;
   supportedModes: ExploreMode[];
   mode: ExploreMode;
+
+  isLive: boolean;
 }
 
 export interface ExploreUpdateState {

+ 13 - 0
public/sass/pages/_explore.scss

@@ -138,7 +138,13 @@
 }
 
 .explore {
+  display: flex;
   flex: 1 1 auto;
+  flex-direction: column;
+}
+
+.explore.explore-live {
+  flex-direction: column-reverse;
 }
 
 .explore + .explore {
@@ -146,9 +152,16 @@
 }
 
 .explore-container {
+  display: flex;
+  flex: 1 1 auto;
+  flex-direction: column;
   padding: $dashboard-padding;
 }
 
+.explore-container.explore-live {
+  flex-direction: column-reverse;
+}
+
 .explore-wrapper {
   display: flex;
 

+ 60 - 0
public/test/core/redux/epicTester.ts

@@ -0,0 +1,60 @@
+import { Epic, ActionsObservable, StateObservable } from 'redux-observable';
+import { Subject } from 'rxjs';
+import { WebSocketSubject } from 'rxjs/webSocket';
+
+import { ActionOf } from 'app/core/redux/actionCreatorFactory';
+import { StoreState } from 'app/types/store';
+import { EpicDependencies } from 'app/store/configureStore';
+
+export const epicTester = (
+  epic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies>,
+  state?: StoreState
+) => {
+  const resultingActions: Array<ActionOf<any>> = [];
+  const action$ = new Subject<ActionOf<any>>();
+  const state$ = new Subject<StoreState>();
+  const actionObservable$ = new ActionsObservable(action$);
+  const stateObservable$ = new StateObservable(state$, state || ({} as StoreState));
+  const websockets$: Array<Subject<any>> = [];
+  const dependencies: EpicDependencies = {
+    getWebSocket: () => {
+      const webSocket$ = new Subject<any>();
+      websockets$.push(webSocket$);
+      return webSocket$ as WebSocketSubject<any>;
+    },
+  };
+  epic(actionObservable$, stateObservable$, dependencies).subscribe({ next: action => resultingActions.push(action) });
+
+  const whenActionIsDispatched = (action: ActionOf<any>) => {
+    action$.next(action);
+
+    return instance;
+  };
+
+  const whenWebSocketReceivesData = (data: any) => {
+    websockets$.forEach(websocket$ => websocket$.next(data));
+
+    return instance;
+  };
+
+  const thenResultingActionsEqual = (...actions: Array<ActionOf<any>>) => {
+    expect(resultingActions).toEqual(actions);
+
+    return instance;
+  };
+
+  const thenNoActionsWhereDispatched = () => {
+    expect(resultingActions).toEqual([]);
+
+    return instance;
+  };
+
+  const instance = {
+    whenActionIsDispatched,
+    whenWebSocketReceivesData,
+    thenResultingActionsEqual,
+    thenNoActionsWhereDispatched,
+  };
+
+  return instance;
+};

+ 5 - 0
yarn.lock

@@ -14735,6 +14735,11 @@ redux-mock-store@1.5.3:
   dependencies:
     lodash.isplainobject "^4.0.6"
 
+redux-observable@1.1.0:
+  version "1.1.0"
+  resolved "https://registry.yarnpkg.com/redux-observable/-/redux-observable-1.1.0.tgz#323a8fe53e89fdb519be2807b55f08e21c13e6f1"
+  integrity sha512-G0nxgmTZwTK3Z3KoQIL8VQu9n0YCUwEP3wc3zxKQ8zAZm+iYkoZvBqAnBJfLi4EsD1E64KR4s4jFH/dFXpV9Og==
+
 redux-thunk@2.3.0:
   version "2.3.0"
   resolved "https://registry.yarnpkg.com/redux-thunk/-/redux-thunk-2.3.0.tgz#51c2c19a185ed5187aaa9a2d08b666d0d6467622"