runQueriesBatchEpic.ts 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import { Epic } from 'redux-observable';
  2. import { Observable, Subject } from 'rxjs';
  3. import { mergeMap, catchError, takeUntil, filter } from 'rxjs/operators';
  4. import _, { isString } from 'lodash';
  5. import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
  6. import {
  7. DataStreamState,
  8. LoadingState,
  9. DataQueryResponse,
  10. DataFrame,
  11. DataQueryResponseData,
  12. AbsoluteTimeRange,
  13. } from '@grafana/ui';
  14. import * as dateMath from '@grafana/ui/src/utils/datemath';
  15. import { ActionOf } from 'app/core/redux/actionCreatorFactory';
  16. import { StoreState } from 'app/types/store';
  17. import { buildQueryTransaction, updateHistory } from 'app/core/utils/explore';
  18. import {
  19. clearQueriesAction,
  20. historyUpdatedAction,
  21. resetExploreAction,
  22. updateDatasourceInstanceAction,
  23. changeRefreshIntervalAction,
  24. processQueryErrorsAction,
  25. processQueryResultsAction,
  26. runQueriesBatchAction,
  27. RunQueriesBatchPayload,
  28. queryStartAction,
  29. limitMessageRatePayloadAction,
  30. stateSaveAction,
  31. changeRangeAction,
  32. } from '../actionTypes';
  33. import { ExploreId, ExploreItemState } from 'app/types';
  34. const publishActions = (outerObservable: Subject<any>, actions: Array<ActionOf<any>>) => {
  35. for (const action of actions) {
  36. outerObservable.next(action);
  37. }
  38. };
  39. interface ProcessResponseConfig {
  40. exploreId: ExploreId;
  41. exploreItemState: ExploreItemState;
  42. datasourceId: string;
  43. now: number;
  44. loadingState: LoadingState;
  45. series?: DataQueryResponseData[];
  46. delta?: DataFrame[];
  47. }
  48. const processResponse = (config: ProcessResponseConfig) => {
  49. const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config;
  50. const { queries, history } = exploreItemState;
  51. const latency = Date.now() - now;
  52. // Side-effect: Saving history in localstorage
  53. const nextHistory = updateHistory(history, datasourceId, queries);
  54. return [
  55. historyUpdatedAction({ exploreId, history: nextHistory }),
  56. processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }),
  57. stateSaveAction(),
  58. ];
  59. };
  60. interface ProcessErrorConfig {
  61. exploreId: ExploreId;
  62. datasourceId: string;
  63. error: any;
  64. }
  65. const processError = (config: ProcessErrorConfig) => {
  66. const { exploreId, datasourceId, error } = config;
  67. return [processQueryErrorsAction({ exploreId, response: error, datasourceId })];
  68. };
  69. export const runQueriesBatchEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
  70. action$,
  71. state$,
  72. { getQueryResponse }
  73. ) => {
  74. return action$.ofType(runQueriesBatchAction.type).pipe(
  75. mergeMap((action: ActionOf<RunQueriesBatchPayload>) => {
  76. const { exploreId, queryOptions } = action.payload;
  77. const exploreItemState = state$.value.explore[exploreId];
  78. const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState;
  79. // Create an observable per run queries action
  80. // Within the observable create two subscriptions
  81. // First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance
  82. // Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback
  83. const observable: Observable<ActionOf<any>> = Observable.create((outerObservable: Subject<any>) => {
  84. const datasourceId = datasourceInstance.meta.id;
  85. const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning);
  86. outerObservable.next(queryStartAction({ exploreId }));
  87. const now = Date.now();
  88. let datasourceUnsubscribe: Function = null;
  89. const streamHandler = new Subject<DataStreamState>();
  90. const observer = (event: DataStreamState) => {
  91. datasourceUnsubscribe = event.unsubscribe;
  92. if (!streamHandler.closed) {
  93. // their might be a race condition when unsubscribing
  94. streamHandler.next(event);
  95. }
  96. };
  97. // observer subscription, handles datasourceInstance.query observer events and pushes that forward
  98. const streamSubscription = streamHandler.subscribe({
  99. next: event => {
  100. const { state, error, series, delta } = event;
  101. if (!series && !delta && !error) {
  102. return;
  103. }
  104. if (state === LoadingState.Error) {
  105. const actions = processError({ exploreId, datasourceId, error });
  106. publishActions(outerObservable, actions);
  107. }
  108. if (state === LoadingState.Streaming) {
  109. if (event.request && event.request.range) {
  110. let newRange = event.request.range;
  111. let absoluteRange: AbsoluteTimeRange = {
  112. from: newRange.from.valueOf(),
  113. to: newRange.to.valueOf(),
  114. };
  115. if (isString(newRange.raw.from)) {
  116. newRange = {
  117. from: dateMath.parse(newRange.raw.from, false),
  118. to: dateMath.parse(newRange.raw.to, true),
  119. raw: newRange.raw,
  120. };
  121. absoluteRange = {
  122. from: newRange.from.valueOf(),
  123. to: newRange.to.valueOf(),
  124. };
  125. }
  126. outerObservable.next(changeRangeAction({ exploreId, range: newRange, absoluteRange }));
  127. }
  128. outerObservable.next(
  129. limitMessageRatePayloadAction({
  130. exploreId,
  131. series: delta,
  132. datasourceId,
  133. })
  134. );
  135. }
  136. if (state === LoadingState.Done || state === LoadingState.Loading) {
  137. const actions = processResponse({
  138. exploreId,
  139. exploreItemState,
  140. datasourceId,
  141. now,
  142. loadingState: state,
  143. series: null,
  144. delta,
  145. });
  146. publishActions(outerObservable, actions);
  147. }
  148. },
  149. });
  150. // query subscription, handles datasourceInstance.query response and pushes that forward
  151. const querySubscription = getQueryResponse(datasourceInstance, transaction.options, observer)
  152. .pipe(
  153. mergeMap((response: DataQueryResponse) => {
  154. return processResponse({
  155. exploreId,
  156. exploreItemState,
  157. datasourceId,
  158. now,
  159. loadingState: LoadingState.Done,
  160. series: response && response.data ? response.data : [],
  161. delta: null,
  162. });
  163. }),
  164. catchError(error => {
  165. return processError({ exploreId, datasourceId, error });
  166. })
  167. )
  168. .subscribe({ next: (action: ActionOf<any>) => outerObservable.next(action) });
  169. // this unsubscribe method will be called when any of the takeUntil actions below happen
  170. const unsubscribe = () => {
  171. if (datasourceUnsubscribe) {
  172. datasourceUnsubscribe();
  173. }
  174. querySubscription.unsubscribe();
  175. streamSubscription.unsubscribe();
  176. streamHandler.unsubscribe();
  177. outerObservable.unsubscribe();
  178. };
  179. return unsubscribe;
  180. });
  181. return observable.pipe(
  182. takeUntil(
  183. action$
  184. .ofType(
  185. runQueriesBatchAction.type,
  186. resetExploreAction.type,
  187. updateDatasourceInstanceAction.type,
  188. changeRefreshIntervalAction.type,
  189. clearQueriesAction.type
  190. )
  191. .pipe(
  192. filter(action => {
  193. if (action.type === resetExploreAction.type) {
  194. return true; // stops all subscriptions if user navigates away
  195. }
  196. if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
  197. return true; // stops subscriptions if user changes data source
  198. }
  199. if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
  200. return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
  201. }
  202. if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
  203. return true; // stops subscriptions if user clears all queries
  204. }
  205. return action.payload.exploreId === exploreId;
  206. })
  207. )
  208. )
  209. );
  210. })
  211. );
  212. };