epics.ts 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import { Epic } from 'redux-observable';
  2. import { NEVER } from 'rxjs';
  3. import { takeUntil, mergeMap, tap, filter, map, throttleTime } from 'rxjs/operators';
  4. import { StoreState, ExploreId } from 'app/types';
  5. import { ActionOf, ActionCreator, actionCreatorFactory } from '../../../core/redux/actionCreatorFactory';
  6. import { config } from '../../../core/config';
  7. import {
  8. updateDatasourceInstanceAction,
  9. resetExploreAction,
  10. changeRefreshIntervalAction,
  11. clearQueriesAction,
  12. } from './actionTypes';
  13. import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
  14. import { SeriesData } from '@grafana/ui/src/types/data';
  15. import { EpicDependencies } from 'app/store/configureStore';
  16. const convertToWebSocketUrl = (url: string) => {
  17. const protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://';
  18. let backend = `${protocol}${window.location.host}${config.appSubUrl}`;
  19. if (backend.endsWith('/')) {
  20. backend = backend.slice(0, backend.length - 1);
  21. }
  22. return `${backend}${url}`;
  23. };
  24. export interface StartSubscriptionsPayload {
  25. exploreId: ExploreId;
  26. dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
  27. }
  28. export const startSubscriptionsAction = actionCreatorFactory<StartSubscriptionsPayload>(
  29. 'explore/START_SUBSCRIPTIONS'
  30. ).create();
  31. export interface StartSubscriptionPayload {
  32. url: string;
  33. refId: string;
  34. exploreId: ExploreId;
  35. dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
  36. }
  37. export const startSubscriptionAction = actionCreatorFactory<StartSubscriptionPayload>(
  38. 'explore/START_SUBSCRIPTION'
  39. ).create();
  40. export interface SubscriptionDataReceivedPayload {
  41. data: SeriesData;
  42. exploreId: ExploreId;
  43. }
  44. export const subscriptionDataReceivedAction = actionCreatorFactory<SubscriptionDataReceivedPayload>(
  45. 'explore/SUBSCRIPTION_DATA_RECEIVED'
  46. ).create();
  47. export interface LimitMessageRatePayload {
  48. data: SeriesData;
  49. exploreId: ExploreId;
  50. dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
  51. }
  52. export const limitMessageRatePayloadAction = actionCreatorFactory<LimitMessageRatePayload>(
  53. 'explore/LIMIT_MESSAGE_RATE_PAYLOAD'
  54. ).create();
  55. export const startSubscriptionsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
  56. return action$.ofType(startSubscriptionsAction.type).pipe(
  57. mergeMap((action: ActionOf<StartSubscriptionsPayload>) => {
  58. const { exploreId, dataReceivedActionCreator } = action.payload;
  59. const { datasourceInstance, queries, refreshInterval } = state$.value.explore[exploreId];
  60. if (!datasourceInstance || !datasourceInstance.convertToStreamTargets) {
  61. return NEVER; //do nothing if datasource does not support streaming
  62. }
  63. if (!refreshInterval || !isLive(refreshInterval)) {
  64. return NEVER; //do nothing if refresh interval is not 'LIVE'
  65. }
  66. const request: any = { targets: queries };
  67. return datasourceInstance.convertToStreamTargets(request).map(target =>
  68. startSubscriptionAction({
  69. url: convertToWebSocketUrl(target.url),
  70. refId: target.refId,
  71. exploreId,
  72. dataReceivedActionCreator,
  73. })
  74. );
  75. })
  76. );
  77. };
  78. export const startSubscriptionEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = (
  79. action$,
  80. state$,
  81. { getWebSocket }
  82. ) => {
  83. return action$.ofType(startSubscriptionAction.type).pipe(
  84. mergeMap((action: ActionOf<StartSubscriptionPayload>) => {
  85. const { url, exploreId, refId, dataReceivedActionCreator } = action.payload;
  86. return getWebSocket(url).pipe(
  87. takeUntil(
  88. action$
  89. .ofType(
  90. startSubscriptionAction.type,
  91. resetExploreAction.type,
  92. updateDatasourceInstanceAction.type,
  93. changeRefreshIntervalAction.type,
  94. clearQueriesAction.type
  95. )
  96. .pipe(
  97. filter(action => {
  98. if (action.type === resetExploreAction.type) {
  99. return true; // stops all subscriptions if user navigates away
  100. }
  101. if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
  102. return true; // stops subscriptions if user changes data source
  103. }
  104. if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
  105. return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
  106. }
  107. if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
  108. return true; // stops subscriptions if user clears all queries
  109. }
  110. return action.payload.exploreId === exploreId && action.payload.refId === refId;
  111. }),
  112. tap(value => console.log('Stopping subscription', value))
  113. )
  114. ),
  115. mergeMap((result: any) => {
  116. const { datasourceInstance } = state$.value.explore[exploreId];
  117. if (!datasourceInstance || !datasourceInstance.resultToSeriesData) {
  118. return [null]; //do nothing if datasource does not support streaming
  119. }
  120. return datasourceInstance
  121. .resultToSeriesData(result, refId)
  122. .map(data => limitMessageRatePayloadAction({ exploreId, data, dataReceivedActionCreator }));
  123. }),
  124. filter(action => action !== null)
  125. );
  126. })
  127. );
  128. };
  129. export const limitMessageRateEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = action$ => {
  130. return action$.ofType(limitMessageRatePayloadAction.type).pipe(
  131. throttleTime(1),
  132. map((action: ActionOf<LimitMessageRatePayload>) => {
  133. const { exploreId, data, dataReceivedActionCreator } = action.payload;
  134. return dataReceivedActionCreator({ exploreId, data });
  135. })
  136. );
  137. };