PanelQueryRunner.ts 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. // Libraries
  2. import cloneDeep from 'lodash/cloneDeep';
  3. import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
  4. // Services & Utils
  5. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  6. import kbn from 'app/core/utils/kbn';
  7. import templateSrv from 'app/features/templating/template_srv';
  8. // Components & Types
  9. import {
  10. guessFieldTypes,
  11. toSeriesData,
  12. PanelData,
  13. LoadingState,
  14. DataQuery,
  15. TimeRange,
  16. ScopedVars,
  17. DataQueryRequest,
  18. SeriesData,
  19. DataSourceApi,
  20. } from '@grafana/ui';
  21. import { PanelQueryState } from './PanelQueryState';
  22. export interface QueryRunnerOptions<TQuery extends DataQuery = DataQuery> {
  23. datasource: string | DataSourceApi<TQuery>;
  24. queries: TQuery[];
  25. panelId: number;
  26. dashboardId?: number;
  27. timezone?: string;
  28. timeRange: TimeRange;
  29. timeInfo?: string; // String description of time range for display
  30. widthPixels: number;
  31. maxDataPoints: number | undefined | null;
  32. minInterval: string | undefined | null;
  33. scopedVars?: ScopedVars;
  34. cacheTimeout?: string;
  35. delayStateNotification?: number; // default 100ms.
  36. }
  37. export enum PanelQueryRunnerFormat {
  38. series = 'series',
  39. legacy = 'legacy',
  40. both = 'both',
  41. }
  42. let counter = 100;
  43. function getNextRequestId() {
  44. return 'Q' + counter++;
  45. }
  46. export class PanelQueryRunner {
  47. private subject?: Subject<PanelData>;
  48. private state = new PanelQueryState();
  49. /**
  50. * Listen for updates to the PanelData. If a query has already run for this panel,
  51. * the results will be immediatly passed to the observer
  52. */
  53. subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.series): Unsubscribable {
  54. if (!this.subject) {
  55. this.subject = new Subject(); // Delay creating a subject until someone is listening
  56. }
  57. if (format === PanelQueryRunnerFormat.legacy) {
  58. this.state.sendLegacy = true;
  59. } else if (format === PanelQueryRunnerFormat.both) {
  60. this.state.sendSeries = true;
  61. this.state.sendLegacy = true;
  62. } else {
  63. this.state.sendSeries = true;
  64. }
  65. // Send the last result
  66. if (this.state.data.state !== LoadingState.NotStarted) {
  67. observer.next(this.state.getDataAfterCheckingFormats());
  68. }
  69. return this.subject.subscribe(observer);
  70. }
  71. async run(options: QueryRunnerOptions): Promise<PanelData> {
  72. if (!this.subject) {
  73. this.subject = new Subject();
  74. }
  75. const { state } = this;
  76. const {
  77. queries,
  78. timezone,
  79. datasource,
  80. panelId,
  81. dashboardId,
  82. timeRange,
  83. timeInfo,
  84. cacheTimeout,
  85. widthPixels,
  86. maxDataPoints,
  87. scopedVars,
  88. minInterval,
  89. delayStateNotification,
  90. } = options;
  91. const request: DataQueryRequest = {
  92. requestId: getNextRequestId(),
  93. timezone,
  94. panelId,
  95. dashboardId,
  96. range: timeRange,
  97. timeInfo,
  98. interval: '',
  99. intervalMs: 0,
  100. targets: cloneDeep(
  101. queries.filter(q => {
  102. return !q.hide; // Skip any hidden queries
  103. })
  104. ),
  105. maxDataPoints: maxDataPoints || widthPixels,
  106. scopedVars: scopedVars || {},
  107. cacheTimeout,
  108. startTime: Date.now(),
  109. };
  110. // Deprecated
  111. (request as any).rangeRaw = timeRange.raw;
  112. let loadingStateTimeoutId = 0;
  113. try {
  114. const ds =
  115. datasource && (datasource as any).query
  116. ? (datasource as DataSourceApi)
  117. : await getDatasourceSrv().get(datasource as string, request.scopedVars);
  118. // Attach the datasource name to each query
  119. request.targets = request.targets.map(query => {
  120. if (!query.datasource) {
  121. query.datasource = ds.name;
  122. }
  123. return query;
  124. });
  125. const lowerIntervalLimit = minInterval ? templateSrv.replace(minInterval, request.scopedVars) : ds.interval;
  126. const norm = kbn.calculateInterval(timeRange, widthPixels, lowerIntervalLimit);
  127. // make shallow copy of scoped vars,
  128. // and add built in variables interval and interval_ms
  129. request.scopedVars = Object.assign({}, request.scopedVars, {
  130. __interval: { text: norm.interval, value: norm.interval },
  131. __interval_ms: { text: norm.intervalMs, value: norm.intervalMs },
  132. });
  133. request.interval = norm.interval;
  134. request.intervalMs = norm.intervalMs;
  135. // Check if we can reuse the already issued query
  136. if (state.isRunning()) {
  137. if (state.isSameQuery(ds, request)) {
  138. // TODO? maybe cancel if it has run too long?
  139. return state.getCurrentExecutor();
  140. } else {
  141. state.cancel('Query Changed while running');
  142. }
  143. }
  144. // Send a loading status event on slower queries
  145. loadingStateTimeoutId = window.setTimeout(() => {
  146. if (this.state.isRunning()) {
  147. this.subject.next(this.state.data);
  148. }
  149. }, delayStateNotification || 500);
  150. const data = await state.execute(ds, request);
  151. // Clear the delayed loading state timeout
  152. clearTimeout(loadingStateTimeoutId);
  153. // Broadcast results
  154. this.subject.next(data);
  155. return data;
  156. } catch (err) {
  157. clearTimeout(loadingStateTimeoutId);
  158. const data = state.setError(err);
  159. this.subject.next(data);
  160. return data;
  161. }
  162. }
  163. /**
  164. * Called when the panel is closed
  165. */
  166. destroy() {
  167. // Tell anyone listening that we are done
  168. if (this.subject) {
  169. this.subject.complete();
  170. }
  171. // Will cancel and disconnect any open requets
  172. this.state.cancel('destroy');
  173. }
  174. }
  175. /**
  176. * All panels will be passed tables that have our best guess at colum type set
  177. *
  178. * This is also used by PanelChrome for snapshot support
  179. */
  180. export function getProcessedSeriesData(results?: any[]): SeriesData[] {
  181. if (!results) {
  182. return [];
  183. }
  184. const series: SeriesData[] = [];
  185. for (const r of results) {
  186. if (r) {
  187. series.push(guessFieldTypes(toSeriesData(r)));
  188. }
  189. }
  190. return series;
  191. }