processQueryResultsEpic.ts 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import _ from 'lodash';
  2. import { Epic } from 'redux-observable';
  3. import { mergeMap } from 'rxjs/operators';
  4. import { NEVER } from 'rxjs';
  5. import { LoadingState } from '@grafana/ui';
  6. import { ActionOf } from 'app/core/redux/actionCreatorFactory';
  7. import { StoreState } from 'app/types/store';
  8. import { getRefIds } from 'app/core/utils/explore';
  9. import {
  10. processQueryResultsAction,
  11. ProcessQueryResultsPayload,
  12. querySuccessAction,
  13. scanRangeAction,
  14. resetQueryErrorAction,
  15. scanStopAction,
  16. } from '../actionTypes';
  17. import { ResultProcessor } from '../../utils/ResultProcessor';
  18. export const processQueryResultsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
  19. return action$.ofType(processQueryResultsAction.type).pipe(
  20. mergeMap((action: ActionOf<ProcessQueryResultsPayload>) => {
  21. const { exploreId, datasourceId, latency, loadingState, series, delta } = action.payload;
  22. const { datasourceInstance, scanning, scanner, eventBridge } = state$.value.explore[exploreId];
  23. // If datasource already changed, results do not matter
  24. if (datasourceInstance.meta.id !== datasourceId) {
  25. return NEVER;
  26. }
  27. const result = series || delta || [];
  28. const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false;
  29. const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result);
  30. const graphResult = resultProcessor.getGraphResult();
  31. const tableResult = resultProcessor.getTableResult();
  32. const logsResult = resultProcessor.getLogsResult();
  33. const refIds = getRefIds(result);
  34. const actions: Array<ActionOf<any>> = [];
  35. // For Angular editors
  36. eventBridge.emit('data-received', resultProcessor.getRawData());
  37. // Clears any previous errors that now have a successful query, important so Angular editors are updated correctly
  38. actions.push(
  39. resetQueryErrorAction({
  40. exploreId,
  41. refIds,
  42. })
  43. );
  44. actions.push(
  45. querySuccessAction({
  46. exploreId,
  47. latency,
  48. loadingState,
  49. graphResult,
  50. tableResult,
  51. logsResult,
  52. })
  53. );
  54. // Keep scanning for results if this was the last scanning transaction
  55. if (scanning) {
  56. if (_.size(result) === 0) {
  57. const range = scanner();
  58. actions.push(scanRangeAction({ exploreId, range }));
  59. } else {
  60. // We can stop scanning if we have a result
  61. actions.push(scanStopAction({ exploreId }));
  62. }
  63. }
  64. return actions;
  65. })
  66. );
  67. };