PanelQueryRunner.ts 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // Libraries
  2. import cloneDeep from 'lodash/cloneDeep';
  3. import throttle from 'lodash/throttle';
  4. import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
  5. // Services & Utils
  6. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  7. import kbn from 'app/core/utils/kbn';
  8. import templateSrv from 'app/features/templating/template_srv';
  9. import { PanelQueryState } from './PanelQueryState';
  10. // Types
  11. import { PanelData, DataQuery, ScopedVars, DataQueryRequest, DataSourceApi, DataSourceJsonData } from '@grafana/ui';
  12. import { TimeRange } from '@grafana/data';
  13. export interface QueryRunnerOptions<
  14. TQuery extends DataQuery = DataQuery,
  15. TOptions extends DataSourceJsonData = DataSourceJsonData
  16. > {
  17. datasource: string | DataSourceApi<TQuery, TOptions>;
  18. queries: TQuery[];
  19. panelId: number;
  20. dashboardId?: number;
  21. timezone?: string;
  22. timeRange: TimeRange;
  23. timeInfo?: string; // String description of time range for display
  24. widthPixels: number;
  25. maxDataPoints: number | undefined | null;
  26. minInterval: string | undefined | null;
  27. scopedVars?: ScopedVars;
  28. cacheTimeout?: string;
  29. delayStateNotification?: number; // default 100ms.
  30. }
  31. export enum PanelQueryRunnerFormat {
  32. frames = 'frames',
  33. legacy = 'legacy',
  34. both = 'both',
  35. }
  36. let counter = 100;
  37. function getNextRequestId() {
  38. return 'Q' + counter++;
  39. }
  40. export class PanelQueryRunner {
  41. private subject?: Subject<PanelData>;
  42. private state = new PanelQueryState();
  43. constructor() {
  44. this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
  45. }
  46. /**
  47. * Listen for updates to the PanelData. If a query has already run for this panel,
  48. * the results will be immediatly passed to the observer
  49. */
  50. subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.frames): Unsubscribable {
  51. if (!this.subject) {
  52. this.subject = new Subject(); // Delay creating a subject until someone is listening
  53. }
  54. if (format === PanelQueryRunnerFormat.legacy) {
  55. this.state.sendLegacy = true;
  56. } else if (format === PanelQueryRunnerFormat.both) {
  57. this.state.sendFrames = true;
  58. this.state.sendLegacy = true;
  59. } else {
  60. this.state.sendFrames = true;
  61. }
  62. // Send the last result
  63. if (this.state.isStarted()) {
  64. observer.next(this.state.getDataAfterCheckingFormats());
  65. }
  66. return this.subject.subscribe(observer);
  67. }
  68. async run(options: QueryRunnerOptions): Promise<PanelData> {
  69. if (!this.subject) {
  70. this.subject = new Subject();
  71. }
  72. const { state } = this;
  73. const {
  74. queries,
  75. timezone,
  76. datasource,
  77. panelId,
  78. dashboardId,
  79. timeRange,
  80. timeInfo,
  81. cacheTimeout,
  82. widthPixels,
  83. maxDataPoints,
  84. scopedVars,
  85. minInterval,
  86. delayStateNotification,
  87. } = options;
  88. const request: DataQueryRequest = {
  89. requestId: getNextRequestId(),
  90. timezone,
  91. panelId,
  92. dashboardId,
  93. range: timeRange,
  94. timeInfo,
  95. interval: '',
  96. intervalMs: 0,
  97. targets: cloneDeep(queries),
  98. maxDataPoints: maxDataPoints || widthPixels,
  99. scopedVars: scopedVars || {},
  100. cacheTimeout,
  101. startTime: Date.now(),
  102. };
  103. // Add deprecated property
  104. (request as any).rangeRaw = timeRange.raw;
  105. let loadingStateTimeoutId = 0;
  106. try {
  107. const ds = await getDataSource(datasource, request.scopedVars);
  108. if (ds.meta && !ds.meta.hiddenQueries) {
  109. request.targets = request.targets.filter(q => !q.hide);
  110. }
  111. // Attach the datasource name to each query
  112. request.targets = request.targets.map(query => {
  113. if (!query.datasource) {
  114. query.datasource = ds.name;
  115. }
  116. return query;
  117. });
  118. const lowerIntervalLimit = minInterval ? templateSrv.replace(minInterval, request.scopedVars) : ds.interval;
  119. const norm = kbn.calculateInterval(timeRange, widthPixels, lowerIntervalLimit);
  120. // make shallow copy of scoped vars,
  121. // and add built in variables interval and interval_ms
  122. request.scopedVars = Object.assign({}, request.scopedVars, {
  123. __interval: { text: norm.interval, value: norm.interval },
  124. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  125. });
  126. request.interval = norm.interval;
  127. request.intervalMs = norm.intervalMs;
  128. // Check if we can reuse the already issued query
  129. const active = state.getActiveRunner();
  130. if (active) {
  131. if (state.isSameQuery(ds, request)) {
  132. // Maybe cancel if it has run too long?
  133. console.log('Trying to execute query while last one has yet to complete, returning same promise');
  134. return active;
  135. } else {
  136. state.cancel('Query Changed while running');
  137. }
  138. }
  139. // Send a loading status event on slower queries
  140. loadingStateTimeoutId = window.setTimeout(() => {
  141. if (state.getActiveRunner()) {
  142. this.subject.next(this.state.validateStreamsAndGetPanelData());
  143. }
  144. }, delayStateNotification || 500);
  145. const data = await state.execute(ds, request);
  146. // Clear the delayed loading state timeout
  147. clearTimeout(loadingStateTimeoutId);
  148. // Broadcast results
  149. this.subject.next(data);
  150. return data;
  151. } catch (err) {
  152. clearTimeout(loadingStateTimeoutId);
  153. const data = state.setError(err);
  154. this.subject.next(data);
  155. return data;
  156. }
  157. }
  158. /**
  159. * Called after every streaming event. This should be throttled so we
  160. * avoid accidentally overwhelming the browser
  161. */
  162. onStreamingDataUpdated = throttle(
  163. () => {
  164. this.subject.next(this.state.validateStreamsAndGetPanelData());
  165. },
  166. 50,
  167. { trailing: true, leading: true }
  168. );
  169. /**
  170. * Called when the panel is closed
  171. */
  172. destroy() {
  173. // Tell anyone listening that we are done
  174. if (this.subject) {
  175. this.subject.complete();
  176. }
  177. // Will cancel and disconnect any open requets
  178. this.state.cancel('destroy');
  179. }
  180. }
  181. async function getDataSource(
  182. datasource: string | DataSourceApi | null,
  183. scopedVars: ScopedVars
  184. ): Promise<DataSourceApi> {
  185. if (datasource && (datasource as any).query) {
  186. return datasource as DataSourceApi;
  187. }
  188. return await getDatasourceSrv().get(datasource as string, scopedVars);
  189. }