PanelQueryRunner.ts 8.4 KB


  1. // Libraries
  2. import cloneDeep from 'lodash/cloneDeep';
  3. import throttle from 'lodash/throttle';
  4. import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
  5. // Services & Utils
  6. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  7. import kbn from 'app/core/utils/kbn';
  8. import templateSrv from 'app/features/templating/template_srv';
  9. import { PanelQueryState } from './PanelQueryState';
  10. import { isSharedDashboardQuery, SharedQueryRunner } from 'app/plugins/datasource/dashboard/SharedQueryRunner';
  11. // Types
  12. import { PanelData, DataQuery, DataQueryRequest, DataSourceApi, DataSourceJsonData } from '@grafana/ui';
  13. import { TimeRange, DataTransformerConfig, transformDataFrame, toLegacyResponseData, ScopedVars } from '@grafana/data';
  14. import config from 'app/core/config';
  15. export interface QueryRunnerOptions<
  16. TQuery extends DataQuery = DataQuery,
  17. TOptions extends DataSourceJsonData = DataSourceJsonData
  18. > {
  19. datasource: string | DataSourceApi<TQuery, TOptions>;
  20. queries: TQuery[];
  21. panelId: number;
  22. dashboardId?: number;
  23. timezone?: string;
  24. timeRange: TimeRange;
  25. timeInfo?: string; // String description of time range for display
  26. widthPixels: number;
  27. maxDataPoints: number | undefined | null;
  28. minInterval: string | undefined | null;
  29. scopedVars?: ScopedVars;
  30. cacheTimeout?: string;
  31. delayStateNotification?: number; // default 100ms.
  32. transformations?: DataTransformerConfig[];
  33. }
  34. export enum PanelQueryRunnerFormat {
  35. frames = 'frames',
  36. legacy = 'legacy',
  37. both = 'both',
  38. }
  39. let counter = 100;
  40. function getNextRequestId() {
  41. return 'Q' + counter++;
  42. }
  43. export class PanelQueryRunner {
  44. private subject?: Subject<PanelData>;
  45. private state = new PanelQueryState();
  46. private transformations?: DataTransformerConfig[];
  47. // Listen to another panel for changes
  48. private sharedQueryRunner: SharedQueryRunner;
  49. constructor(private panelId: number) {
  50. this.state.onStreamingDataUpdated = this.onStreamingDataUpdated;
  51. this.subject = new Subject();
  52. }
  53. getPanelId() {
  54. return this.panelId;
  55. }
  56. /**
  57. * Get the last result -- optionally skip the transformation
  58. */
  59. // TODO: add tests
  60. getCurrentData(transform = true): PanelData {
  61. const v = this.state.validateStreamsAndGetPanelData();
  62. const transformData = config.featureToggles.transformations && transform;
  63. const hasTransformations = this.transformations && this.transformations.length;
  64. if (transformData && hasTransformations) {
  65. const processed = transformDataFrame(this.transformations, v.series);
  66. return {
  67. ...v,
  68. series: processed,
  69. legacy: processed.map(p => toLegacyResponseData(p)),
  70. };
  71. }
  72. return v;
  73. }
  74. /**
  75. * Listen for updates to the PanelData. If a query has already run for this panel,
  76. * the results will be immediatly passed to the observer
  77. */
  78. subscribe(observer: PartialObserver<PanelData>, format = PanelQueryRunnerFormat.frames): Unsubscribable {
  79. if (format === PanelQueryRunnerFormat.legacy) {
  80. this.state.sendLegacy = true;
  81. } else if (format === PanelQueryRunnerFormat.both) {
  82. this.state.sendFrames = true;
  83. this.state.sendLegacy = true;
  84. } else {
  85. this.state.sendFrames = true;
  86. }
  87. // Send the last result
  88. if (this.state.isStarted()) {
  89. // Force check formats again?
  90. this.state.getDataAfterCheckingFormats();
  91. observer.next(this.getCurrentData()); // transformed
  92. }
  93. return this.subject.subscribe(observer);
  94. }
  95. /**
  96. * Subscribe one runner to another
  97. */
  98. chain(runner: PanelQueryRunner): Unsubscribable {
  99. const { sendLegacy, sendFrames } = runner.state;
  100. let format = sendFrames ? PanelQueryRunnerFormat.frames : PanelQueryRunnerFormat.legacy;
  101. if (sendLegacy) {
  102. format = PanelQueryRunnerFormat.both;
  103. }
  104. return this.subscribe(runner.subject, format);
  105. }
  106. /**
  107. * Change the current transformation and notify all listeners
  108. * Should be used only by panel editor to update the transformers
  109. */
  110. setTransform = (transformations?: DataTransformerConfig[]) => {
  111. this.transformations = transformations;
  112. if (this.state.isStarted()) {
  113. this.onStreamingDataUpdated();
  114. }
  115. };
  116. async run(options: QueryRunnerOptions): Promise<PanelData> {
  117. const { state } = this;
  118. const {
  119. queries,
  120. timezone,
  121. datasource,
  122. panelId,
  123. dashboardId,
  124. timeRange,
  125. timeInfo,
  126. cacheTimeout,
  127. widthPixels,
  128. maxDataPoints,
  129. scopedVars,
  130. minInterval,
  131. delayStateNotification,
  132. } = options;
  133. // Support shared queries
  134. if (isSharedDashboardQuery(datasource)) {
  135. if (!this.sharedQueryRunner) {
  136. this.sharedQueryRunner = new SharedQueryRunner(this);
  137. }
  138. return this.sharedQueryRunner.process(options);
  139. } else if (this.sharedQueryRunner) {
  140. this.sharedQueryRunner.disconnect();
  141. this.sharedQueryRunner = null;
  142. }
  143. const request: DataQueryRequest = {
  144. requestId: getNextRequestId(),
  145. timezone,
  146. panelId,
  147. dashboardId,
  148. range: timeRange,
  149. timeInfo,
  150. interval: '',
  151. intervalMs: 0,
  152. targets: cloneDeep(queries),
  153. maxDataPoints: maxDataPoints || widthPixels,
  154. scopedVars: scopedVars || {},
  155. cacheTimeout,
  156. startTime: Date.now(),
  157. };
  158. // Add deprecated property
  159. (request as any).rangeRaw = timeRange.raw;
  160. let loadingStateTimeoutId = 0;
  161. try {
  162. const ds = await getDataSource(datasource, request.scopedVars);
  163. if (ds.meta && !ds.meta.hiddenQueries) {
  164. request.targets = request.targets.filter(q => !q.hide);
  165. }
  166. // Attach the datasource name to each query
  167. request.targets = request.targets.map(query => {
  168. if (!query.datasource) {
  169. query.datasource = ds.name;
  170. }
  171. return query;
  172. });
  173. const lowerIntervalLimit = minInterval ? templateSrv.replace(minInterval, request.scopedVars) : ds.interval;
  174. const norm = kbn.calculateInterval(timeRange, widthPixels, lowerIntervalLimit);
  175. // make shallow copy of scoped vars,
  176. // and add built in variables interval and interval_ms
  177. request.scopedVars = Object.assign({}, request.scopedVars, {
  178. __interval: { text: norm.interval, value: norm.interval },
  179. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  180. });
  181. request.interval = norm.interval;
  182. request.intervalMs = norm.intervalMs;
  183. // Check if we can reuse the already issued query
  184. const active = state.getActiveRunner();
  185. if (active) {
  186. if (state.isSameQuery(ds, request)) {
  187. // Maybe cancel if it has run too long?
  188. console.log('Trying to execute query while last one has yet to complete, returning same promise');
  189. return active;
  190. } else {
  191. state.cancel('Query Changed while running');
  192. }
  193. }
  194. // Send a loading status event on slower queries
  195. loadingStateTimeoutId = window.setTimeout(() => {
  196. if (state.getActiveRunner()) {
  197. this.subject.next(this.state.validateStreamsAndGetPanelData());
  198. }
  199. }, delayStateNotification || 500);
  200. this.transformations = options.transformations;
  201. const data = await state.execute(ds, request);
  202. // Clear the delayed loading state timeout
  203. clearTimeout(loadingStateTimeoutId);
  204. // Broadcast results
  205. this.subject.next(this.getCurrentData());
  206. return data;
  207. } catch (err) {
  208. clearTimeout(loadingStateTimeoutId);
  209. const data = state.setError(err);
  210. this.subject.next(data);
  211. return data;
  212. }
  213. }
  214. /**
  215. * Called after every streaming event. This should be throttled so we
  216. * avoid accidentally overwhelming the browser
  217. */
  218. onStreamingDataUpdated = throttle(
  219. () => {
  220. this.subject.next(this.getCurrentData());
  221. },
  222. 50,
  223. { trailing: true, leading: true }
  224. );
  225. /**
  226. * Called when the panel is closed
  227. */
  228. destroy() {
  229. // Tell anyone listening that we are done
  230. if (this.subject) {
  231. this.subject.complete();
  232. }
  233. // Will cancel and disconnect any open requets
  234. this.state.cancel('destroy');
  235. }
  236. setState = (state: PanelQueryState) => {
  237. this.state = state;
  238. };
  239. getState = () => {
  240. return this.state;
  241. };
  242. }
  243. async function getDataSource(
  244. datasource: string | DataSourceApi | null,
  245. scopedVars: ScopedVars
  246. ): Promise<DataSourceApi> {
  247. if (datasource && (datasource as any).query) {
  248. return datasource as DataSourceApi;
  249. }
  250. return await getDatasourceSrv().get(datasource as string, scopedVars);
  251. }