runQueriesBatchEpic.ts 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import { Epic } from 'redux-observable';
  2. import { Observable, Subject } from 'rxjs';
  3. import { mergeMap, catchError, takeUntil, filter } from 'rxjs/operators';
  4. import _, { isString } from 'lodash';
  5. import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
  6. import { DataStreamState, DataQueryResponse, DataQueryResponseData } from '@grafana/ui';
  7. import { LoadingState, DataFrame, AbsoluteTimeRange } from '@grafana/data';
  8. import { dateMath } from '@grafana/data';
  9. import { ActionOf } from 'app/core/redux/actionCreatorFactory';
  10. import { StoreState } from 'app/types/store';
  11. import { buildQueryTransaction, updateHistory } from 'app/core/utils/explore';
  12. import {
  13. clearQueriesAction,
  14. historyUpdatedAction,
  15. resetExploreAction,
  16. updateDatasourceInstanceAction,
  17. changeRefreshIntervalAction,
  18. processQueryErrorsAction,
  19. processQueryResultsAction,
  20. runQueriesBatchAction,
  21. RunQueriesBatchPayload,
  22. queryStartAction,
  23. limitMessageRatePayloadAction,
  24. stateSaveAction,
  25. changeRangeAction,
  26. } from '../actionTypes';
  27. import { ExploreId, ExploreItemState } from 'app/types';
  28. const publishActions = (outerObservable: Subject<any>, actions: Array<ActionOf<any>>) => {
  29. for (const action of actions) {
  30. outerObservable.next(action);
  31. }
  32. };
  33. interface ProcessResponseConfig {
  34. exploreId: ExploreId;
  35. exploreItemState: ExploreItemState;
  36. datasourceId: string;
  37. now: number;
  38. loadingState: LoadingState;
  39. series?: DataQueryResponseData[];
  40. delta?: DataFrame[];
  41. }
  42. const processResponse = (config: ProcessResponseConfig) => {
  43. const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config;
  44. const { queries, history } = exploreItemState;
  45. const latency = Date.now() - now;
  46. // Side-effect: Saving history in localstorage
  47. const nextHistory = updateHistory(history, datasourceId, queries);
  48. return [
  49. historyUpdatedAction({ exploreId, history: nextHistory }),
  50. processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }),
  51. stateSaveAction(),
  52. ];
  53. };
  54. interface ProcessErrorConfig {
  55. exploreId: ExploreId;
  56. datasourceId: string;
  57. error: any;
  58. }
  59. const processError = (config: ProcessErrorConfig) => {
  60. const { exploreId, datasourceId, error } = config;
  61. return [processQueryErrorsAction({ exploreId, response: error, datasourceId })];
  62. };
  63. export const runQueriesBatchEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
  64. action$,
  65. state$,
  66. { getQueryResponse }
  67. ) => {
  68. return action$.ofType(runQueriesBatchAction.type).pipe(
  69. mergeMap((action: ActionOf<RunQueriesBatchPayload>) => {
  70. const { exploreId, queryOptions } = action.payload;
  71. const exploreItemState = state$.value.explore[exploreId];
  72. const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState;
  73. // Create an observable per run queries action
  74. // Within the observable create two subscriptions
  75. // First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance
  76. // Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback
  77. const observable: Observable<ActionOf<any>> = Observable.create((outerObservable: Subject<any>) => {
  78. const datasourceId = datasourceInstance.meta.id;
  79. const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning);
  80. outerObservable.next(queryStartAction({ exploreId }));
  81. const now = Date.now();
  82. let datasourceUnsubscribe: Function = null;
  83. const streamHandler = new Subject<DataStreamState>();
  84. const observer = (event: DataStreamState) => {
  85. datasourceUnsubscribe = event.unsubscribe;
  86. if (!streamHandler.closed) {
  87. // their might be a race condition when unsubscribing
  88. streamHandler.next(event);
  89. }
  90. };
  91. // observer subscription, handles datasourceInstance.query observer events and pushes that forward
  92. const streamSubscription = streamHandler.subscribe({
  93. next: event => {
  94. const { state, error, data, delta } = event;
  95. if (!data && !delta && !error) {
  96. return;
  97. }
  98. if (state === LoadingState.Error) {
  99. const actions = processError({ exploreId, datasourceId, error });
  100. publishActions(outerObservable, actions);
  101. }
  102. if (state === LoadingState.Streaming) {
  103. if (event.request && event.request.range) {
  104. let newRange = event.request.range;
  105. let absoluteRange: AbsoluteTimeRange = {
  106. from: newRange.from.valueOf(),
  107. to: newRange.to.valueOf(),
  108. };
  109. if (isString(newRange.raw.from)) {
  110. newRange = {
  111. from: dateMath.parse(newRange.raw.from, false),
  112. to: dateMath.parse(newRange.raw.to, true),
  113. raw: newRange.raw,
  114. };
  115. absoluteRange = {
  116. from: newRange.from.valueOf(),
  117. to: newRange.to.valueOf(),
  118. };
  119. }
  120. outerObservable.next(changeRangeAction({ exploreId, range: newRange, absoluteRange }));
  121. }
  122. outerObservable.next(
  123. limitMessageRatePayloadAction({
  124. exploreId,
  125. series: delta,
  126. datasourceId,
  127. })
  128. );
  129. }
  130. if (state === LoadingState.Done || state === LoadingState.Loading) {
  131. const actions = processResponse({
  132. exploreId,
  133. exploreItemState,
  134. datasourceId,
  135. now,
  136. loadingState: state,
  137. series: null,
  138. delta,
  139. });
  140. publishActions(outerObservable, actions);
  141. }
  142. },
  143. });
  144. // query subscription, handles datasourceInstance.query response and pushes that forward
  145. const querySubscription = getQueryResponse(datasourceInstance, transaction.options, observer)
  146. .pipe(
  147. mergeMap((response: DataQueryResponse) => {
  148. return processResponse({
  149. exploreId,
  150. exploreItemState,
  151. datasourceId,
  152. now,
  153. loadingState: LoadingState.Done,
  154. series: response && response.data ? response.data : [],
  155. delta: null,
  156. });
  157. }),
  158. catchError(error => {
  159. return processError({ exploreId, datasourceId, error });
  160. })
  161. )
  162. .subscribe({ next: (action: ActionOf<any>) => outerObservable.next(action) });
  163. // this unsubscribe method will be called when any of the takeUntil actions below happen
  164. const unsubscribe = () => {
  165. if (datasourceUnsubscribe) {
  166. datasourceUnsubscribe();
  167. }
  168. querySubscription.unsubscribe();
  169. streamSubscription.unsubscribe();
  170. streamHandler.unsubscribe();
  171. outerObservable.unsubscribe();
  172. };
  173. return unsubscribe;
  174. });
  175. return observable.pipe(
  176. takeUntil(
  177. action$
  178. .ofType(
  179. runQueriesBatchAction.type,
  180. resetExploreAction.type,
  181. updateDatasourceInstanceAction.type,
  182. changeRefreshIntervalAction.type,
  183. clearQueriesAction.type
  184. )
  185. .pipe(
  186. filter(action => {
  187. if (action.type === resetExploreAction.type) {
  188. return true; // stops all subscriptions if user navigates away
  189. }
  190. if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
  191. return true; // stops subscriptions if user changes data source
  192. }
  193. if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
  194. return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
  195. }
  196. if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
  197. return true; // stops subscriptions if user clears all queries
  198. }
  199. return action.payload.exploreId === exploreId;
  200. })
  201. )
  202. )
  203. );
  204. })
  205. );
  206. };