runQueriesBatchEpic.ts 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import { Epic } from 'redux-observable';
  2. import { Observable, Subject } from 'rxjs';
  3. import { mergeMap, catchError, takeUntil, filter } from 'rxjs/operators';
  4. import _, { isString } from 'lodash';
  5. import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
  6. import { DataStreamState, LoadingState, DataQueryResponse, SeriesData, DataQueryResponseData } from '@grafana/ui';
  7. import * as dateMath from '@grafana/ui/src/utils/datemath';
  8. import { ActionOf } from 'app/core/redux/actionCreatorFactory';
  9. import { StoreState } from 'app/types/store';
  10. import { buildQueryTransaction, updateHistory } from 'app/core/utils/explore';
  11. import {
  12. clearQueriesAction,
  13. historyUpdatedAction,
  14. resetExploreAction,
  15. updateDatasourceInstanceAction,
  16. changeRefreshIntervalAction,
  17. processQueryErrorsAction,
  18. processQueryResultsAction,
  19. runQueriesBatchAction,
  20. RunQueriesBatchPayload,
  21. queryStartAction,
  22. limitMessageRatePayloadAction,
  23. stateSaveAction,
  24. changeRangeAction,
  25. } from '../actionTypes';
  26. import { ExploreId, ExploreItemState } from 'app/types';
  27. const publishActions = (outerObservable: Subject<any>, actions: Array<ActionOf<any>>) => {
  28. for (const action of actions) {
  29. outerObservable.next(action);
  30. }
  31. };
  32. interface ProcessResponseConfig {
  33. exploreId: ExploreId;
  34. exploreItemState: ExploreItemState;
  35. datasourceId: string;
  36. now: number;
  37. loadingState: LoadingState;
  38. series?: DataQueryResponseData[];
  39. delta?: SeriesData[];
  40. }
  41. const processResponse = (config: ProcessResponseConfig) => {
  42. const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config;
  43. const { queries, history } = exploreItemState;
  44. const latency = Date.now() - now;
  45. // Side-effect: Saving history in localstorage
  46. const nextHistory = updateHistory(history, datasourceId, queries);
  47. return [
  48. historyUpdatedAction({ exploreId, history: nextHistory }),
  49. processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }),
  50. stateSaveAction(),
  51. ];
  52. };
  53. interface ProcessErrorConfig {
  54. exploreId: ExploreId;
  55. datasourceId: string;
  56. error: any;
  57. }
  58. const processError = (config: ProcessErrorConfig) => {
  59. const { exploreId, datasourceId, error } = config;
  60. return [processQueryErrorsAction({ exploreId, response: error, datasourceId })];
  61. };
  62. export const runQueriesBatchEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
  63. action$,
  64. state$,
  65. { getQueryResponse }
  66. ) => {
  67. return action$.ofType(runQueriesBatchAction.type).pipe(
  68. mergeMap((action: ActionOf<RunQueriesBatchPayload>) => {
  69. const { exploreId, queryOptions } = action.payload;
  70. const exploreItemState = state$.value.explore[exploreId];
  71. const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState;
  72. // Create an observable per run queries action
  73. // Within the observable create two subscriptions
  74. // First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance
  75. // Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback
  76. const observable: Observable<ActionOf<any>> = Observable.create((outerObservable: Subject<any>) => {
  77. const datasourceId = datasourceInstance.meta.id;
  78. const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning);
  79. outerObservable.next(queryStartAction({ exploreId }));
  80. const now = Date.now();
  81. let datasourceUnsubscribe: Function = null;
  82. const streamHandler = new Subject<DataStreamState>();
  83. const observer = (event: DataStreamState) => {
  84. datasourceUnsubscribe = event.unsubscribe;
  85. if (!streamHandler.closed) {
  86. // their might be a race condition when unsubscribing
  87. streamHandler.next(event);
  88. }
  89. };
  90. // observer subscription, handles datasourceInstance.query observer events and pushes that forward
  91. const streamSubscription = streamHandler.subscribe({
  92. next: event => {
  93. const { state, error, series, delta } = event;
  94. if (!series && !delta && !error) {
  95. return;
  96. }
  97. if (state === LoadingState.Error) {
  98. const actions = processError({ exploreId, datasourceId, error });
  99. publishActions(outerObservable, actions);
  100. }
  101. if (state === LoadingState.Streaming) {
  102. if (event.request && event.request.range) {
  103. let newRange = event.request.range;
  104. if (isString(newRange.raw.from)) {
  105. newRange = {
  106. from: dateMath.parse(newRange.raw.from, false),
  107. to: dateMath.parse(newRange.raw.to, true),
  108. raw: newRange.raw,
  109. };
  110. }
  111. outerObservable.next(changeRangeAction({ exploreId, range: newRange }));
  112. }
  113. outerObservable.next(
  114. limitMessageRatePayloadAction({
  115. exploreId,
  116. series: delta,
  117. datasourceId,
  118. })
  119. );
  120. }
  121. if (state === LoadingState.Done || state === LoadingState.Loading) {
  122. const actions = processResponse({
  123. exploreId,
  124. exploreItemState,
  125. datasourceId,
  126. now,
  127. loadingState: state,
  128. series: null,
  129. delta,
  130. });
  131. publishActions(outerObservable, actions);
  132. }
  133. },
  134. });
  135. // query subscription, handles datasourceInstance.query response and pushes that forward
  136. const querySubscription = getQueryResponse(datasourceInstance, transaction.options, observer)
  137. .pipe(
  138. mergeMap((response: DataQueryResponse) => {
  139. return processResponse({
  140. exploreId,
  141. exploreItemState,
  142. datasourceId,
  143. now,
  144. loadingState: LoadingState.Done,
  145. series: response && response.data ? response.data : [],
  146. delta: null,
  147. });
  148. }),
  149. catchError(error => {
  150. return processError({ exploreId, datasourceId, error });
  151. })
  152. )
  153. .subscribe({ next: (action: ActionOf<any>) => outerObservable.next(action) });
  154. // this unsubscribe method will be called when any of the takeUntil actions below happen
  155. const unsubscribe = () => {
  156. if (datasourceUnsubscribe) {
  157. datasourceUnsubscribe();
  158. }
  159. querySubscription.unsubscribe();
  160. streamSubscription.unsubscribe();
  161. streamHandler.unsubscribe();
  162. outerObservable.unsubscribe();
  163. };
  164. return unsubscribe;
  165. });
  166. return observable.pipe(
  167. takeUntil(
  168. action$
  169. .ofType(
  170. runQueriesBatchAction.type,
  171. resetExploreAction.type,
  172. updateDatasourceInstanceAction.type,
  173. changeRefreshIntervalAction.type,
  174. clearQueriesAction.type
  175. )
  176. .pipe(
  177. filter(action => {
  178. if (action.type === resetExploreAction.type) {
  179. return true; // stops all subscriptions if user navigates away
  180. }
  181. if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
  182. return true; // stops subscriptions if user changes data source
  183. }
  184. if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
  185. return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
  186. }
  187. if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
  188. return true; // stops subscriptions if user clears all queries
  189. }
  190. return action.payload.exploreId === exploreId;
  191. })
  192. )
  193. )
  194. );
  195. })
  196. );
  197. };