PanelQueryRunner.ts 7.1 KB


  1. // Libraries
  2. import cloneDeep from 'lodash/cloneDeep';
  3. import throttle from 'lodash/throttle';
  4. import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
  5. // Services & Utils
  6. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  7. import kbn from 'app/core/utils/kbn';
  8. import templateSrv from 'app/features/templating/template_srv';
  9. import { PanelQueryState } from './PanelQueryState';
  10. import { isSharedDashboardQuery, SharedQueryRunner } from 'app/plugins/datasource/dashboard/SharedQueryRunner';
  11. // Types
  12. import { PanelData, DataQuery, ScopedVars, DataQueryRequest, DataSourceApi, DataSourceJsonData } from '@grafana/ui';
  13. import { TimeRange } from '@grafana/data';
  14. export interface QueryRunnerOptions<
  15. TQuery extends DataQuery = DataQuery,
  16. TOptions extends DataSourceJsonData = DataSourceJsonData
  17. > {
  18. datasource: string | DataSourceApi<TQuery, TOptions>;
  19. queries: TQuery[];
  20. panelId: number;
  21. dashboardId?: number;
  22. timezone?: string;
  23. timeRange: TimeRange;
  24. timeInfo?: string; // String description of time range for display
  25. widthPixels: number;
  26. maxDataPoints: number | undefined | null;
  27. minInterval: string | undefined | null;
  28. scopedVars?: ScopedVars;
  29. cacheTimeout?: string;
  30. delayStateNotification?: number; // default 100ms.
  31. }
  32. export enum PanelQueryRunnerFormat {
  33. frames = 'frames',
  34. legacy = 'legacy',
  35. both = 'both',
  36. }
  37. let counter = 100;
  38. function getNextRequestId() {
  39. return 'Q' + counter++;
  40. }
  41. export class PanelQueryRunner {
  42. private subject?: Subject<PanelData>;
  43. private state = new PanelQueryState();
  44. // Listen to another panel for changes
  45. private sharedQueryRunner: SharedQueryRunner;
  46. constructor(private panelId: number) {
  47. this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
  48. this.subject = new Subject();
  49. }
  50. getPanelId() {
  51. return this.panelId;
  52. }
  53. /**
  54. * Listen for updates to the PanelData. If a query has already run for this panel,
  55. * the results will be immediatly passed to the observer
  56. */
  57. subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.frames): Unsubscribable {
  58. if (format === PanelQueryRunnerFormat.legacy) {
  59. this.state.sendLegacy = true;
  60. } else if (format === PanelQueryRunnerFormat.both) {
  61. this.state.sendFrames = true;
  62. this.state.sendLegacy = true;
  63. } else {
  64. this.state.sendFrames = true;
  65. }
  66. // Send the last result
  67. if (this.state.isStarted()) {
  68. observer.next(this.state.getDataAfterCheckingFormats());
  69. }
  70. return this.subject.subscribe(observer);
  71. }
  72. /**
  73. * Subscribe one runner to another
  74. */
  75. chain(runner: PanelQueryRunner): Unsubscribable {
  76. const { sendLegacy, sendFrames } = runner.state;
  77. let format = sendFrames ? PanelQueryRunnerFormat.frames : PanelQueryRunnerFormat.legacy;
  78. if (sendLegacy) {
  79. format = PanelQueryRunnerFormat.both;
  80. }
  81. return this.subscribe(runner.subject, format);
  82. }
  83. getCurrentData(): PanelData {
  84. return this.state.validateStreamsAndGetPanelData();
  85. }
  86. async run(options: QueryRunnerOptions): Promise<PanelData> {
  87. const { state } = this;
  88. const {
  89. queries,
  90. timezone,
  91. datasource,
  92. panelId,
  93. dashboardId,
  94. timeRange,
  95. timeInfo,
  96. cacheTimeout,
  97. widthPixels,
  98. maxDataPoints,
  99. scopedVars,
  100. minInterval,
  101. delayStateNotification,
  102. } = options;
  103. // Support shared queries
  104. if (isSharedDashboardQuery(datasource)) {
  105. if (!this.sharedQueryRunner) {
  106. this.sharedQueryRunner = new SharedQueryRunner(this);
  107. }
  108. return this.sharedQueryRunner.process(options);
  109. } else if (this.sharedQueryRunner) {
  110. this.sharedQueryRunner.disconnect();
  111. this.sharedQueryRunner = null;
  112. }
  113. const request: DataQueryRequest = {
  114. requestId: getNextRequestId(),
  115. timezone,
  116. panelId,
  117. dashboardId,
  118. range: timeRange,
  119. timeInfo,
  120. interval: '',
  121. intervalMs: 0,
  122. targets: cloneDeep(queries),
  123. maxDataPoints: maxDataPoints || widthPixels,
  124. scopedVars: scopedVars || {},
  125. cacheTimeout,
  126. startTime: Date.now(),
  127. };
  128. // Add deprecated property
  129. (request as any).rangeRaw = timeRange.raw;
  130. let loadingStateTimeoutId = 0;
  131. try {
  132. const ds = await getDataSource(datasource, request.scopedVars);
  133. if (ds.meta && !ds.meta.hiddenQueries) {
  134. request.targets = request.targets.filter(q => !q.hide);
  135. }
  136. // Attach the datasource name to each query
  137. request.targets = request.targets.map(query => {
  138. if (!query.datasource) {
  139. query.datasource = ds.name;
  140. }
  141. return query;
  142. });
  143. const lowerIntervalLimit = minInterval ? templateSrv.replace(minInterval, request.scopedVars) : ds.interval;
  144. const norm = kbn.calculateInterval(timeRange, widthPixels, lowerIntervalLimit);
  145. // make shallow copy of scoped vars,
  146. // and add built in variables interval and interval_ms
  147. request.scopedVars = Object.assign({}, request.scopedVars, {
  148. __interval: { text: norm.interval, value: norm.interval },
  149. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  150. });
  151. request.interval = norm.interval;
  152. request.intervalMs = norm.intervalMs;
  153. // Check if we can reuse the already issued query
  154. const active = state.getActiveRunner();
  155. if (active) {
  156. if (state.isSameQuery(ds, request)) {
  157. // Maybe cancel if it has run too long?
  158. console.log('Trying to execute query while last one has yet to complete, returning same promise');
  159. return active;
  160. } else {
  161. state.cancel('Query Changed while running');
  162. }
  163. }
  164. // Send a loading status event on slower queries
  165. loadingStateTimeoutId = window.setTimeout(() => {
  166. if (state.getActiveRunner()) {
  167. this.subject.next(this.state.validateStreamsAndGetPanelData());
  168. }
  169. }, delayStateNotification || 500);
  170. const data = await state.execute(ds, request);
  171. // Clear the delayed loading state timeout
  172. clearTimeout(loadingStateTimeoutId);
  173. // Broadcast results
  174. this.subject.next(data);
  175. return data;
  176. } catch (err) {
  177. clearTimeout(loadingStateTimeoutId);
  178. const data = state.setError(err);
  179. this.subject.next(data);
  180. return data;
  181. }
  182. }
  183. /**
  184. * Called after every streaming event. This should be throttled so we
  185. * avoid accidentally overwhelming the browser
  186. */
  187. onStreamingDataUpdated = throttle(
  188. () => {
  189. this.subject.next(this.state.validateStreamsAndGetPanelData());
  190. },
  191. 50,
  192. { trailing: true, leading: true }
  193. );
  194. /**
  195. * Called when the panel is closed
  196. */
  197. destroy() {
  198. // Tell anyone listening that we are done
  199. if (this.subject) {
  200. this.subject.complete();
  201. }
  202. // Will cancel and disconnect any open requets
  203. this.state.cancel('destroy');
  204. }
  205. }
  206. async function getDataSource(
  207. datasource: string | DataSourceApi | null,
  208. scopedVars: ScopedVars
  209. ): Promise<DataSourceApi> {
  210. if (datasource && (datasource as any).query) {
  211. return datasource as DataSourceApi;
  212. }
  213. return await getDatasourceSrv().get(datasource as string, scopedVars);
  214. }