processQueryResultsEpic.ts 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import _ from 'lodash';
  2. import { Epic } from 'redux-observable';
  3. import { mergeMap } from 'rxjs/operators';
  4. import { NEVER } from 'rxjs';
  5. import { LoadingState } from '@grafana/data';
  6. import { ActionOf } from 'app/core/redux/actionCreatorFactory';
  7. import { StoreState } from 'app/types/store';
  8. import { getRefIds } from 'app/core/utils/explore';
  9. import {
  10. processQueryResultsAction,
  11. ProcessQueryResultsPayload,
  12. querySuccessAction,
  13. resetQueryErrorAction,
  14. scanStopAction,
  15. updateTimeRangeAction,
  16. runQueriesAction,
  17. } from '../actionTypes';
  18. import { ResultProcessor } from '../../utils/ResultProcessor';
  19. export const processQueryResultsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
  20. action$,
  21. state$,
  22. { getTimeZone, getShiftedTimeRange }
  23. ) => {
  24. return action$.ofType(processQueryResultsAction.type).pipe(
  25. mergeMap((action: ActionOf<ProcessQueryResultsPayload>) => {
  26. const { exploreId, datasourceId, latency, loadingState, series, delta } = action.payload;
  27. const { datasourceInstance, scanning, eventBridge } = state$.value.explore[exploreId];
  28. // If datasource already changed, results do not matter
  29. if (datasourceInstance.meta.id !== datasourceId) {
  30. return NEVER;
  31. }
  32. const result = series || delta || [];
  33. const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false;
  34. const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result);
  35. const graphResult = resultProcessor.getGraphResult();
  36. const tableResult = resultProcessor.getTableResult();
  37. const logsResult = resultProcessor.getLogsResult();
  38. const refIds = getRefIds(result);
  39. const actions: Array<ActionOf<any>> = [];
  40. // For Angular editors
  41. eventBridge.emit('data-received', resultProcessor.getRawData());
  42. // Clears any previous errors that now have a successful query, important so Angular editors are updated correctly
  43. actions.push(
  44. resetQueryErrorAction({
  45. exploreId,
  46. refIds,
  47. })
  48. );
  49. actions.push(
  50. querySuccessAction({
  51. exploreId,
  52. latency,
  53. loadingState,
  54. graphResult,
  55. tableResult,
  56. logsResult,
  57. })
  58. );
  59. // Keep scanning for results if this was the last scanning transaction
  60. if (scanning) {
  61. if (_.size(result) === 0) {
  62. const range = getShiftedTimeRange(-1, state$.value.explore[exploreId].range, getTimeZone(state$.value.user));
  63. actions.push(updateTimeRangeAction({ exploreId, absoluteRange: range }));
  64. actions.push(runQueriesAction({ exploreId }));
  65. } else {
  66. // We can stop scanning if we have a result
  67. actions.push(scanStopAction({ exploreId }));
  68. }
  69. }
  70. return actions;
  71. })
  72. );
  73. };