PanelQueryRunner.ts 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. // Libraries
  2. import cloneDeep from 'lodash/cloneDeep';
  3. import throttle from 'lodash/throttle';
  4. import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
  5. // Services & Utils
  6. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  7. import kbn from 'app/core/utils/kbn';
  8. import templateSrv from 'app/features/templating/template_srv';
  9. import { PanelQueryState } from './PanelQueryState';
  10. // Types
  11. import {
  12. PanelData,
  13. DataQuery,
  14. TimeRange,
  15. ScopedVars,
  16. DataQueryRequest,
  17. DataSourceApi,
  18. DataSourceJsonData,
  19. } from '@grafana/ui';
  20. export interface QueryRunnerOptions<
  21. TQuery extends DataQuery = DataQuery,
  22. TOptions extends DataSourceJsonData = DataSourceJsonData
  23. > {
  24. datasource: string | DataSourceApi<TQuery, TOptions>;
  25. queries: TQuery[];
  26. panelId: number;
  27. dashboardId?: number;
  28. timezone?: string;
  29. timeRange: TimeRange;
  30. timeInfo?: string; // String description of time range for display
  31. widthPixels: number;
  32. maxDataPoints: number | undefined | null;
  33. minInterval: string | undefined | null;
  34. scopedVars?: ScopedVars;
  35. cacheTimeout?: string;
  36. delayStateNotification?: number; // default 100ms.
  37. }
  38. export enum PanelQueryRunnerFormat {
  39. series = 'series',
  40. legacy = 'legacy',
  41. both = 'both',
  42. }
  43. let counter = 100;
  44. function getNextRequestId() {
  45. return 'Q' + counter++;
  46. }
  47. export class PanelQueryRunner {
  48. private subject?: Subject<PanelData>;
  49. private state = new PanelQueryState();
  50. constructor() {
  51. this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
  52. }
  53. /**
  54. * Listen for updates to the PanelData. If a query has already run for this panel,
  55. * the results will be immediatly passed to the observer
  56. */
  57. subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.series): Unsubscribable {
  58. if (!this.subject) {
  59. this.subject = new Subject(); // Delay creating a subject until someone is listening
  60. }
  61. if (format === PanelQueryRunnerFormat.legacy) {
  62. this.state.sendLegacy = true;
  63. } else if (format === PanelQueryRunnerFormat.both) {
  64. this.state.sendSeries = true;
  65. this.state.sendLegacy = true;
  66. } else {
  67. this.state.sendSeries = true;
  68. }
  69. // Send the last result
  70. if (this.state.isStarted()) {
  71. observer.next(this.state.getDataAfterCheckingFormats());
  72. }
  73. return this.subject.subscribe(observer);
  74. }
  75. async run(options: QueryRunnerOptions): Promise<PanelData> {
  76. if (!this.subject) {
  77. this.subject = new Subject();
  78. }
  79. const { state } = this;
  80. const {
  81. queries,
  82. timezone,
  83. datasource,
  84. panelId,
  85. dashboardId,
  86. timeRange,
  87. timeInfo,
  88. cacheTimeout,
  89. widthPixels,
  90. maxDataPoints,
  91. scopedVars,
  92. minInterval,
  93. delayStateNotification,
  94. } = options;
  95. // filter out hidden queries & deep clone them
  96. const clonedAndFilteredQueries = cloneDeep(queries.filter(q => !q.hide));
  97. const request: DataQueryRequest = {
  98. requestId: getNextRequestId(),
  99. timezone,
  100. panelId,
  101. dashboardId,
  102. range: timeRange,
  103. timeInfo,
  104. interval: '',
  105. intervalMs: 0,
  106. targets: clonedAndFilteredQueries,
  107. maxDataPoints: maxDataPoints || widthPixels,
  108. scopedVars: scopedVars || {},
  109. cacheTimeout,
  110. startTime: Date.now(),
  111. };
  112. // Add deprecated property
  113. (request as any).rangeRaw = timeRange.raw;
  114. let loadingStateTimeoutId = 0;
  115. try {
  116. const ds = await getDataSource(datasource, request.scopedVars);
  117. // Attach the datasource name to each query
  118. request.targets = request.targets.map(query => {
  119. if (!query.datasource) {
  120. query.datasource = ds.name;
  121. }
  122. return query;
  123. });
  124. const lowerIntervalLimit = minInterval ? templateSrv.replace(minInterval, request.scopedVars) : ds.interval;
  125. const norm = kbn.calculateInterval(timeRange, widthPixels, lowerIntervalLimit);
  126. // make shallow copy of scoped vars,
  127. // and add built in variables interval and interval_ms
  128. request.scopedVars = Object.assign({}, request.scopedVars, {
  129. __interval: { text: norm.interval, value: norm.interval },
  130. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  131. });
  132. request.interval = norm.interval;
  133. request.intervalMs = norm.intervalMs;
  134. // Check if we can reuse the already issued query
  135. const active = state.getActiveRunner();
  136. if (active) {
  137. if (state.isSameQuery(ds, request)) {
  138. // Maybe cancel if it has run too long?
  139. console.log('Trying to execute query while last one has yet to complete, returning same promise');
  140. return active;
  141. } else {
  142. state.cancel('Query Changed while running');
  143. }
  144. }
  145. // Send a loading status event on slower queries
  146. loadingStateTimeoutId = window.setTimeout(() => {
  147. if (state.getActiveRunner()) {
  148. this.subject.next(this.state.validateStreamsAndGetPanelData());
  149. }
  150. }, delayStateNotification || 500);
  151. const data = await state.execute(ds, request);
  152. // Clear the delayed loading state timeout
  153. clearTimeout(loadingStateTimeoutId);
  154. // Broadcast results
  155. this.subject.next(data);
  156. return data;
  157. } catch (err) {
  158. clearTimeout(loadingStateTimeoutId);
  159. const data = state.setError(err);
  160. this.subject.next(data);
  161. return data;
  162. }
  163. }
  164. /**
  165. * Called after every streaming event. This should be throttled so we
  166. * avoid accidentally overwhelming the browser
  167. */
  168. onStreamingDataUpdated = throttle(
  169. () => {
  170. this.subject.next(this.state.validateStreamsAndGetPanelData());
  171. },
  172. 50,
  173. { trailing: true, leading: true }
  174. );
  175. /**
  176. * Called when the panel is closed
  177. */
  178. destroy() {
  179. // Tell anyone listening that we are done
  180. if (this.subject) {
  181. this.subject.complete();
  182. }
  183. // Will cancel and disconnect any open requets
  184. this.state.cancel('destroy');
  185. }
  186. }
  187. async function getDataSource(
  188. datasource: string | DataSourceApi | null,
  189. scopedVars: ScopedVars
  190. ): Promise<DataSourceApi> {
  191. if (datasource && (datasource as any).query) {
  192. return datasource as DataSourceApi;
  193. }
  194. return await getDatasourceSrv().get(datasource as string, scopedVars);
  195. }