PanelQueryState.ts 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // Libraries
  2. import isString from 'lodash/isString';
  3. import isEqual from 'lodash/isEqual';
  4. // Utils & Services
  5. import { getBackendSrv } from 'app/core/services/backend_srv';
  6. import * as dateMath from '@grafana/ui/src/utils/datemath';
  7. import { guessFieldTypes, toDataFrame, isDataFrame } from '@grafana/ui/src/utils';
  8. // Types
  9. import {
  10. DataSourceApi,
  11. DataQueryRequest,
  12. PanelData,
  13. LoadingState,
  14. toLegacyResponseData,
  15. DataQueryError,
  16. DataStreamObserver,
  17. DataStreamState,
  18. DataFrame,
  19. DataQueryResponseData,
  20. } from '@grafana/ui';
  21. export class PanelQueryState {
  22. // The current/last running request
  23. request = {
  24. startTime: 0,
  25. endTime: 1000, // Somethign not zero
  26. } as DataQueryRequest;
  27. // The result back from the datasource query
  28. response = {
  29. state: LoadingState.NotStarted,
  30. series: [],
  31. } as PanelData;
  32. // Active stream results
  33. streams: DataStreamState[] = [];
  34. sendSeries = false;
  35. sendLegacy = false;
  36. // A promise for the running query
  37. private executor?: Promise<PanelData> = null;
  38. private rejector = (reason?: any) => {};
  39. private datasource: DataSourceApi = {} as any;
  40. isFinished(state: LoadingState) {
  41. return state === LoadingState.Done || state === LoadingState.Error;
  42. }
  43. isStarted() {
  44. return this.response.state !== LoadingState.NotStarted;
  45. }
  46. isSameQuery(ds: DataSourceApi, req: DataQueryRequest) {
  47. if (ds !== this.datasource) {
  48. return false;
  49. }
  50. // For now just check that the targets look the same
  51. return isEqual(this.request.targets, req.targets);
  52. }
  53. /**
  54. * Return the currently running query
  55. */
  56. getActiveRunner(): Promise<PanelData> | undefined {
  57. return this.executor;
  58. }
  59. cancel(reason: string) {
  60. const { request } = this;
  61. this.executor = null;
  62. try {
  63. // If no endTime the call to datasource.query did not complete
  64. // call rejector to reject the executor promise
  65. if (!request.endTime) {
  66. request.endTime = Date.now();
  67. this.rejector('Canceled:' + reason);
  68. }
  69. // Cancel any open HTTP request with the same ID
  70. if (request.requestId) {
  71. getBackendSrv().resolveCancelerIfExists(request.requestId);
  72. }
  73. } catch (err) {
  74. console.log('Error canceling request', err);
  75. }
  76. // Close any open streams
  77. this.closeStreams(true);
  78. }
  79. execute(ds: DataSourceApi, req: DataQueryRequest): Promise<PanelData> {
  80. this.request = req;
  81. this.datasource = ds;
  82. // Return early if there are no queries to run
  83. if (!req.targets.length) {
  84. console.log('No queries, so return early');
  85. this.request.endTime = Date.now();
  86. this.closeStreams();
  87. return Promise.resolve(
  88. (this.response = {
  89. state: LoadingState.Done,
  90. series: [], // Clear the data
  91. legacy: [],
  92. })
  93. );
  94. }
  95. // Set the loading state immediatly
  96. this.response.state = LoadingState.Loading;
  97. this.executor = new Promise<PanelData>((resolve, reject) => {
  98. this.rejector = reject;
  99. return ds
  100. .query(this.request, this.dataStreamObserver)
  101. .then(resp => {
  102. this.request.endTime = Date.now();
  103. this.executor = null;
  104. // Make sure we send something back -- called run() w/o subscribe!
  105. if (!(this.sendSeries || this.sendLegacy)) {
  106. this.sendSeries = true;
  107. }
  108. // Save the result state
  109. this.response = {
  110. state: LoadingState.Done,
  111. request: this.request,
  112. series: this.sendSeries ? getProcessedDataFrame(resp.data) : [],
  113. legacy: this.sendLegacy ? translateToLegacyData(resp.data) : undefined,
  114. };
  115. resolve(this.validateStreamsAndGetPanelData());
  116. })
  117. .catch(err => {
  118. this.executor = null;
  119. resolve(this.setError(err));
  120. });
  121. });
  122. return this.executor;
  123. }
  124. // Send a notice when the stream has updated the current model
  125. onStreamingDataUpdated: () => void;
  126. // This gets all stream events and keeps track of them
  127. // it will then delegate real changes to the PanelQueryRunner
  128. dataStreamObserver: DataStreamObserver = (stream: DataStreamState) => {
  129. // Streams only work with the 'series' format
  130. this.sendSeries = true;
  131. // Add the stream to our list
  132. let found = false;
  133. const active = this.streams.map(s => {
  134. if (s.key === stream.key) {
  135. found = true;
  136. return stream;
  137. }
  138. return s;
  139. });
  140. if (!found) {
  141. if (shouldDisconnect(this.request, stream)) {
  142. console.log('Got stream update from old stream, unsubscribing');
  143. stream.unsubscribe();
  144. return;
  145. }
  146. active.push(stream);
  147. }
  148. this.streams = active;
  149. this.onStreamingDataUpdated();
  150. };
  151. closeStreams(keepSeries = false) {
  152. if (!this.streams.length) {
  153. return;
  154. }
  155. const series: DataFrame[] = [];
  156. for (const stream of this.streams) {
  157. if (stream.series) {
  158. series.push.apply(series, stream.series);
  159. }
  160. try {
  161. stream.unsubscribe();
  162. } catch {
  163. console.log('Failed to unsubscribe to stream');
  164. }
  165. }
  166. this.streams = [];
  167. // Move the series from streams to the resposne
  168. if (keepSeries) {
  169. const { response } = this;
  170. this.response = {
  171. ...response,
  172. series: [
  173. ...response.series,
  174. ...series, // Append the streamed series
  175. ],
  176. };
  177. }
  178. }
  179. /**
  180. * This is called before broadcasting data to listeners. Given that
  181. * stream events can happen at any point, we need to make sure to
  182. * only return data from active streams.
  183. */
  184. validateStreamsAndGetPanelData(): PanelData {
  185. const { response, streams, request } = this;
  186. // When not streaming, return the response + request
  187. if (!streams.length) {
  188. return {
  189. ...response,
  190. request: request,
  191. };
  192. }
  193. let done = this.isFinished(response.state);
  194. const series = [...response.series];
  195. const active: DataStreamState[] = [];
  196. for (const stream of this.streams) {
  197. if (shouldDisconnect(request, stream)) {
  198. console.log('getPanelData() - shouldDisconnect true, unsubscribing to steam');
  199. stream.unsubscribe();
  200. continue;
  201. }
  202. active.push(stream);
  203. series.push.apply(series, stream.series);
  204. if (!this.isFinished(stream.state)) {
  205. done = false;
  206. }
  207. }
  208. this.streams = active;
  209. // Update the time range
  210. let timeRange = this.request.range;
  211. if (isString(timeRange.raw.from)) {
  212. timeRange = {
  213. from: dateMath.parse(timeRange.raw.from, false),
  214. to: dateMath.parse(timeRange.raw.to, true),
  215. raw: timeRange.raw,
  216. };
  217. }
  218. return {
  219. state: done ? LoadingState.Done : LoadingState.Streaming,
  220. series, // Union of series from response and all streams
  221. legacy: this.sendLegacy ? translateToLegacyData(series) : undefined,
  222. request: {
  223. ...this.request,
  224. range: timeRange, // update the time range
  225. },
  226. };
  227. }
  228. /**
  229. * Make sure all requested formats exist on the data
  230. */
  231. getDataAfterCheckingFormats(): PanelData {
  232. const { response, sendLegacy, sendSeries } = this;
  233. if (sendLegacy && (!response.legacy || !response.legacy.length)) {
  234. response.legacy = response.series.map(v => toLegacyResponseData(v));
  235. }
  236. if (sendSeries && !response.series.length && response.legacy) {
  237. response.series = response.legacy.map(v => toDataFrame(v));
  238. }
  239. return this.validateStreamsAndGetPanelData();
  240. }
  241. setError(err: any): PanelData {
  242. if (!this.request.endTime) {
  243. this.request.endTime = Date.now();
  244. }
  245. this.closeStreams(true);
  246. this.response = {
  247. ...this.response, // Keep any existing data
  248. state: LoadingState.Error,
  249. error: toDataQueryError(err),
  250. };
  251. return this.validateStreamsAndGetPanelData();
  252. }
  253. }
  254. export function shouldDisconnect(source: DataQueryRequest, state: DataStreamState) {
  255. // It came from the same the same request, so keep it
  256. if (source === state.request || state.request.requestId.startsWith(source.requestId)) {
  257. return false;
  258. }
  259. // We should be able to check that it is the same query regardless of
  260. // if it came from the same request. This will be important for #16676
  261. return true;
  262. }
  263. export function toDataQueryError(err: any): DataQueryError {
  264. const error = (err || {}) as DataQueryError;
  265. if (!error.message) {
  266. if (typeof err === 'string' || err instanceof String) {
  267. return { message: err } as DataQueryError;
  268. }
  269. let message = 'Query error';
  270. if (error.message) {
  271. message = error.message;
  272. } else if (error.data && error.data.message) {
  273. message = error.data.message;
  274. } else if (error.data && error.data.error) {
  275. message = error.data.error;
  276. } else if (error.status) {
  277. message = `Query error: ${error.status} ${error.statusText}`;
  278. }
  279. error.message = message;
  280. }
  281. return error;
  282. }
  283. function translateToLegacyData(data: DataQueryResponseData) {
  284. return data.map(v => {
  285. if (isDataFrame(v)) {
  286. return toLegacyResponseData(v);
  287. }
  288. return v;
  289. });
  290. }
  291. /**
  292. * All panels will be passed tables that have our best guess at colum type set
  293. *
  294. * This is also used by PanelChrome for snapshot support
  295. */
  296. export function getProcessedDataFrame(results?: any[]): DataFrame[] {
  297. if (!results) {
  298. return [];
  299. }
  300. const series: DataFrame[] = [];
  301. for (const r of results) {
  302. if (r) {
  303. series.push(guessFieldTypes(toDataFrame(r)));
  304. }
  305. }
  306. return series;
  307. }