PanelQueryRunner.ts 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // Libraries
  2. import cloneDeep from 'lodash/cloneDeep';
  3. import throttle from 'lodash/throttle';
  4. import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
  5. // Services & Utils
  6. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  7. import kbn from 'app/core/utils/kbn';
  8. import templateSrv from 'app/features/templating/template_srv';
  9. import { PanelQueryState } from './PanelQueryState';
  10. // Types
  11. import {
  12. PanelData,
  13. DataQuery,
  14. TimeRange,
  15. ScopedVars,
  16. DataQueryRequest,
  17. DataSourceApi,
  18. DataSourceJsonData,
  19. } from '@grafana/ui';
  20. export interface QueryRunnerOptions<
  21. TQuery extends DataQuery = DataQuery,
  22. TOptions extends DataSourceJsonData = DataSourceJsonData
  23. > {
  24. datasource: string | DataSourceApi<TQuery, TOptions>;
  25. queries: TQuery[];
  26. panelId: number;
  27. dashboardId?: number;
  28. timezone?: string;
  29. timeRange: TimeRange;
  30. timeInfo?: string; // String description of time range for display
  31. widthPixels: number;
  32. maxDataPoints: number | undefined | null;
  33. minInterval: string | undefined | null;
  34. scopedVars?: ScopedVars;
  35. cacheTimeout?: string;
  36. delayStateNotification?: number; // default 100ms.
  37. }
  38. export enum PanelQueryRunnerFormat {
  39. series = 'series',
  40. legacy = 'legacy',
  41. both = 'both',
  42. }
  43. let counter = 100;
  44. function getNextRequestId() {
  45. return 'Q' + counter++;
  46. }
  47. export class PanelQueryRunner {
  48. private subject?: Subject<PanelData>;
  49. private state = new PanelQueryState();
  50. constructor() {
  51. this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
  52. }
  53. /**
  54. * Listen for updates to the PanelData. If a query has already run for this panel,
  55. * the results will be immediatly passed to the observer
  56. */
  57. subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.series): Unsubscribable {
  58. if (!this.subject) {
  59. this.subject = new Subject(); // Delay creating a subject until someone is listening
  60. }
  61. if (format === PanelQueryRunnerFormat.legacy) {
  62. this.state.sendLegacy = true;
  63. } else if (format === PanelQueryRunnerFormat.both) {
  64. this.state.sendSeries = true;
  65. this.state.sendLegacy = true;
  66. } else {
  67. this.state.sendSeries = true;
  68. }
  69. // Send the last result
  70. if (this.state.isStarted()) {
  71. observer.next(this.state.getDataAfterCheckingFormats());
  72. }
  73. return this.subject.subscribe(observer);
  74. }
  75. async run(options: QueryRunnerOptions): Promise<PanelData> {
  76. if (!this.subject) {
  77. this.subject = new Subject();
  78. }
  79. const { state } = this;
  80. const {
  81. queries,
  82. timezone,
  83. datasource,
  84. panelId,
  85. dashboardId,
  86. timeRange,
  87. timeInfo,
  88. cacheTimeout,
  89. widthPixels,
  90. maxDataPoints,
  91. scopedVars,
  92. minInterval,
  93. delayStateNotification,
  94. } = options;
  95. const request: DataQueryRequest = {
  96. requestId: getNextRequestId(),
  97. timezone,
  98. panelId,
  99. dashboardId,
  100. range: timeRange,
  101. timeInfo,
  102. interval: '',
  103. intervalMs: 0,
  104. targets: cloneDeep(queries),
  105. maxDataPoints: maxDataPoints || widthPixels,
  106. scopedVars: scopedVars || {},
  107. cacheTimeout,
  108. startTime: Date.now(),
  109. };
  110. // Add deprecated property
  111. (request as any).rangeRaw = timeRange.raw;
  112. let loadingStateTimeoutId = 0;
  113. try {
  114. const ds = await getDataSource(datasource, request.scopedVars);
  115. if (ds.meta && !ds.meta.hiddenQueries) {
  116. request.targets = request.targets.filter(q => !q.hide);
  117. }
  118. // Attach the datasource name to each query
  119. request.targets = request.targets.map(query => {
  120. if (!query.datasource) {
  121. query.datasource = ds.name;
  122. }
  123. return query;
  124. });
  125. const lowerIntervalLimit = minInterval ? templateSrv.replace(minInterval, request.scopedVars) : ds.interval;
  126. const norm = kbn.calculateInterval(timeRange, widthPixels, lowerIntervalLimit);
  127. // make shallow copy of scoped vars,
  128. // and add built in variables interval and interval_ms
  129. request.scopedVars = Object.assign({}, request.scopedVars, {
  130. __interval: { text: norm.interval, value: norm.interval },
  131. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  132. });
  133. request.interval = norm.interval;
  134. request.intervalMs = norm.intervalMs;
  135. // Check if we can reuse the already issued query
  136. const active = state.getActiveRunner();
  137. if (active) {
  138. if (state.isSameQuery(ds, request)) {
  139. // Maybe cancel if it has run too long?
  140. console.log('Trying to execute query while last one has yet to complete, returning same promise');
  141. return active;
  142. } else {
  143. state.cancel('Query Changed while running');
  144. }
  145. }
  146. // Send a loading status event on slower queries
  147. loadingStateTimeoutId = window.setTimeout(() => {
  148. if (state.getActiveRunner()) {
  149. this.subject.next(this.state.validateStreamsAndGetPanelData());
  150. }
  151. }, delayStateNotification || 500);
  152. const data = await state.execute(ds, request);
  153. // Clear the delayed loading state timeout
  154. clearTimeout(loadingStateTimeoutId);
  155. // Broadcast results
  156. this.subject.next(data);
  157. return data;
  158. } catch (err) {
  159. clearTimeout(loadingStateTimeoutId);
  160. const data = state.setError(err);
  161. this.subject.next(data);
  162. return data;
  163. }
  164. }
  165. /**
  166. * Called after every streaming event. This should be throttled so we
  167. * avoid accidentally overwhelming the browser
  168. */
  169. onStreamingDataUpdated = throttle(
  170. () => {
  171. this.subject.next(this.state.validateStreamsAndGetPanelData());
  172. },
  173. 50,
  174. { trailing: true, leading: true }
  175. );
  176. /**
  177. * Called when the panel is closed
  178. */
  179. destroy() {
  180. // Tell anyone listening that we are done
  181. if (this.subject) {
  182. this.subject.complete();
  183. }
  184. // Will cancel and disconnect any open requets
  185. this.state.cancel('destroy');
  186. }
  187. }
  188. async function getDataSource(
  189. datasource: string | DataSourceApi | null,
  190. scopedVars: ScopedVars
  191. ): Promise<DataSourceApi> {
  192. if (datasource && (datasource as any).query) {
  193. return datasource as DataSourceApi;
  194. }
  195. return await getDatasourceSrv().get(datasource as string, scopedVars);
  196. }