PanelQueryRunner.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Libraries
  2. import cloneDeep from 'lodash/cloneDeep';
  3. import throttle from 'lodash/throttle';
  4. import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
  5. // Services & Utils
  6. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  7. import kbn from 'app/core/utils/kbn';
  8. import templateSrv from 'app/features/templating/template_srv';
  9. import { PanelQueryState } from './PanelQueryState';
  10. // Types
  11. import { PanelData, DataQuery, TimeRange, ScopedVars, DataQueryRequest, DataSourceApi } from '@grafana/ui';
  12. export interface QueryRunnerOptions<TQuery extends DataQuery = DataQuery> {
  13. datasource: string | DataSourceApi<TQuery>;
  14. queries: TQuery[];
  15. panelId: number;
  16. dashboardId?: number;
  17. timezone?: string;
  18. timeRange: TimeRange;
  19. timeInfo?: string; // String description of time range for display
  20. widthPixels: number;
  21. maxDataPoints: number | undefined | null;
  22. minInterval: string | undefined | null;
  23. scopedVars?: ScopedVars;
  24. cacheTimeout?: string;
  25. delayStateNotification?: number; // default 100ms.
  26. }
  27. export enum PanelQueryRunnerFormat {
  28. series = 'series',
  29. legacy = 'legacy',
  30. both = 'both',
  31. }
  32. let counter = 100;
  33. function getNextRequestId() {
  34. return 'Q' + counter++;
  35. }
  36. export class PanelQueryRunner {
  37. private subject?: Subject<PanelData>;
  38. private state = new PanelQueryState();
  39. constructor() {
  40. this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
  41. }
  42. /**
  43. * Listen for updates to the PanelData. If a query has already run for this panel,
  44. * the results will be immediatly passed to the observer
  45. */
  46. subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.series): Unsubscribable {
  47. if (!this.subject) {
  48. this.subject = new Subject(); // Delay creating a subject until someone is listening
  49. }
  50. if (format === PanelQueryRunnerFormat.legacy) {
  51. this.state.sendLegacy = true;
  52. } else if (format === PanelQueryRunnerFormat.both) {
  53. this.state.sendSeries = true;
  54. this.state.sendLegacy = true;
  55. } else {
  56. this.state.sendSeries = true;
  57. }
  58. // Send the last result
  59. if (this.state.isStarted()) {
  60. observer.next(this.state.getDataAfterCheckingFormats());
  61. }
  62. return this.subject.subscribe(observer);
  63. }
  64. async run(options: QueryRunnerOptions): Promise<PanelData> {
  65. if (!this.subject) {
  66. this.subject = new Subject();
  67. }
  68. const { state } = this;
  69. const {
  70. queries,
  71. timezone,
  72. datasource,
  73. panelId,
  74. dashboardId,
  75. timeRange,
  76. timeInfo,
  77. cacheTimeout,
  78. widthPixels,
  79. maxDataPoints,
  80. scopedVars,
  81. minInterval,
  82. delayStateNotification,
  83. } = options;
  84. // filter out hidden queries & deep clone them
  85. const clonedAndFilteredQueries = cloneDeep(queries.filter(q => !q.hide));
  86. const request: DataQueryRequest = {
  87. requestId: getNextRequestId(),
  88. timezone,
  89. panelId,
  90. dashboardId,
  91. range: timeRange,
  92. timeInfo,
  93. interval: '',
  94. intervalMs: 0,
  95. targets: clonedAndFilteredQueries,
  96. maxDataPoints: maxDataPoints || widthPixels,
  97. scopedVars: scopedVars || {},
  98. cacheTimeout,
  99. startTime: Date.now(),
  100. };
  101. // Add deprecated property
  102. (request as any).rangeRaw = timeRange.raw;
  103. let loadingStateTimeoutId = 0;
  104. try {
  105. const ds = await getDataSource(datasource, request.scopedVars);
  106. // Attach the datasource name to each query
  107. request.targets = request.targets.map(query => {
  108. if (!query.datasource) {
  109. query.datasource = ds.name;
  110. }
  111. return query;
  112. });
  113. const lowerIntervalLimit = minInterval ? templateSrv.replace(minInterval, request.scopedVars) : ds.interval;
  114. const norm = kbn.calculateInterval(timeRange, widthPixels, lowerIntervalLimit);
  115. // make shallow copy of scoped vars,
  116. // and add built in variables interval and interval_ms
  117. request.scopedVars = Object.assign({}, request.scopedVars, {
  118. __interval: { text: norm.interval, value: norm.interval },
  119. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  120. });
  121. request.interval = norm.interval;
  122. request.intervalMs = norm.intervalMs;
  123. // Check if we can reuse the already issued query
  124. const active = state.getActiveRunner();
  125. if (active) {
  126. if (state.isSameQuery(ds, request)) {
  127. // Maybe cancel if it has run too long?
  128. console.log('Trying to execute query while last one has yet to complete, returning same promise');
  129. return active;
  130. } else {
  131. state.cancel('Query Changed while running');
  132. }
  133. }
  134. // Send a loading status event on slower queries
  135. loadingStateTimeoutId = window.setTimeout(() => {
  136. if (state.getActiveRunner()) {
  137. this.subject.next(this.state.validateStreamsAndGetPanelData());
  138. }
  139. }, delayStateNotification || 500);
  140. const data = await state.execute(ds, request);
  141. // Clear the delayed loading state timeout
  142. clearTimeout(loadingStateTimeoutId);
  143. // Broadcast results
  144. this.subject.next(data);
  145. return data;
  146. } catch (err) {
  147. clearTimeout(loadingStateTimeoutId);
  148. const data = state.setError(err);
  149. this.subject.next(data);
  150. return data;
  151. }
  152. }
  153. /**
  154. * Called after every streaming event. This should be throttled so we
  155. * avoid accidentally overwhelming the browser
  156. */
  157. onStreamingDataUpdated = throttle(
  158. () => {
  159. this.subject.next(this.state.validateStreamsAndGetPanelData());
  160. },
  161. 50,
  162. { trailing: true, leading: true }
  163. );
  164. /**
  165. * Called when the panel is closed
  166. */
  167. destroy() {
  168. // Tell anyone listening that we are done
  169. if (this.subject) {
  170. this.subject.complete();
  171. }
  172. // Will cancel and disconnect any open requets
  173. this.state.cancel('destroy');
  174. }
  175. }
  176. async function getDataSource(
  177. datasource: string | DataSourceApi | null,
  178. scopedVars: ScopedVars
  179. ): Promise<DataSourceApi> {
  180. if (datasource && (datasource as any).query) {
  181. return datasource as DataSourceApi;
  182. }
  183. return await getDatasourceSrv().get(datasource as string, scopedVars);
  184. }