PanelQueryState.ts 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Libraries
  2. import isString from 'lodash/isString';
  3. import isEqual from 'lodash/isEqual';
  4. // Utils & Services
  5. import { getBackendSrv } from 'app/core/services/backend_srv';
  6. import * as dateMath from 'app/core/utils/datemath';
  7. import { guessFieldTypes, toSeriesData, isSeriesData } from '@grafana/ui/src/utils';
  8. // Types
  9. import {
  10. DataSourceApi,
  11. DataQueryRequest,
  12. PanelData,
  13. LoadingState,
  14. toLegacyResponseData,
  15. DataQueryError,
  16. DataStreamObserver,
  17. DataStreamState,
  18. SeriesData,
  19. DataQueryResponseData,
  20. } from '@grafana/ui';
  21. export class PanelQueryState {
  22. // The current/last running request
  23. request = {
  24. startTime: 0,
  25. endTime: 1000, // Somethign not zero
  26. } as DataQueryRequest;
  27. // The result back from the datasource query
  28. response = {
  29. state: LoadingState.NotStarted,
  30. series: [],
  31. } as PanelData;
  32. // Active stream results
  33. streams: DataStreamState[] = [];
  34. sendSeries = false;
  35. sendLegacy = false;
  36. // A promise for the running query
  37. private executor?: Promise<PanelData>;
  38. private rejector = (reason?: any) => {};
  39. private datasource: DataSourceApi = {} as any;
  40. isFinished(state: LoadingState) {
  41. return state === LoadingState.Done || state === LoadingState.Error;
  42. }
  43. isStarted() {
  44. return this.response.state !== LoadingState.NotStarted;
  45. }
  46. isSameQuery(ds: DataSourceApi, req: DataQueryRequest) {
  47. if (ds !== this.datasource) {
  48. return false;
  49. }
  50. // For now just check that the targets look the same
  51. return isEqual(this.request.targets, req.targets);
  52. }
  53. /**
  54. * Return the currently running query
  55. */
  56. getActiveRunner(): Promise<PanelData> | undefined {
  57. return this.executor;
  58. }
  59. cancel(reason: string) {
  60. const { request } = this;
  61. this.executor = null;
  62. try {
  63. // If no endTime the call to datasource.query did not complete
  64. // call rejector to reject the executor promise
  65. if (!request.endTime) {
  66. request.endTime = Date.now();
  67. this.rejector('Canceled:' + reason);
  68. }
  69. // Cancel any open HTTP request with the same ID
  70. if (request.requestId) {
  71. getBackendSrv().resolveCancelerIfExists(request.requestId);
  72. }
  73. } catch (err) {
  74. console.log('Error canceling request', err);
  75. }
  76. // Close any open streams
  77. this.closeStreams(true);
  78. }
  79. execute(ds: DataSourceApi, req: DataQueryRequest): Promise<PanelData> {
  80. this.request = req;
  81. // Return early if there are no queries to run
  82. if (!req.targets.length) {
  83. console.log('No queries, so return early');
  84. this.request.endTime = Date.now();
  85. this.closeStreams();
  86. return Promise.resolve(
  87. (this.response = {
  88. state: LoadingState.Done,
  89. series: [], // Clear the data
  90. legacy: [],
  91. })
  92. );
  93. }
  94. // Set the loading state immediatly
  95. this.response.state = LoadingState.Loading;
  96. this.executor = new Promise<PanelData>((resolve, reject) => {
  97. this.rejector = reject;
  98. return ds
  99. .query(this.request, this.dataStreamObserver)
  100. .then(resp => {
  101. this.request.endTime = Date.now();
  102. this.executor = null;
  103. // Make sure we send something back -- called run() w/o subscribe!
  104. if (!(this.sendSeries || this.sendLegacy)) {
  105. this.sendSeries = true;
  106. }
  107. // Save the result state
  108. this.response = {
  109. state: LoadingState.Done,
  110. request: this.request,
  111. series: this.sendSeries ? getProcessedSeriesData(resp.data) : [],
  112. legacy: this.sendLegacy ? translateToLegacyData(resp.data) : undefined,
  113. };
  114. resolve(this.validateStreamsAndGetPanelData());
  115. })
  116. .catch(err => {
  117. this.executor = null;
  118. resolve(this.setError(err));
  119. });
  120. });
  121. return this.executor;
  122. }
  123. // Send a notice when the stream has updated the current model
  124. onStreamingDataUpdated: () => void;
  125. // This gets all stream events and keeps track of them
  126. // it will then delegate real changes to the PanelQueryRunner
  127. dataStreamObserver: DataStreamObserver = (stream: DataStreamState) => {
  128. // Streams only work with the 'series' format
  129. this.sendSeries = true;
  130. // Add the stream to our list
  131. let found = false;
  132. const active = this.streams.map(s => {
  133. if (s.key === stream.key) {
  134. found = true;
  135. return stream;
  136. }
  137. return s;
  138. });
  139. if (!found) {
  140. if (shouldDisconnect(this.request, stream)) {
  141. console.log('Got stream update from old stream, unsubscribing');
  142. stream.unsubscribe();
  143. return;
  144. }
  145. active.push(stream);
  146. }
  147. this.streams = active;
  148. this.onStreamingDataUpdated();
  149. };
  150. closeStreams(keepSeries = false) {
  151. if (!this.streams.length) {
  152. return;
  153. }
  154. const series: SeriesData[] = [];
  155. for (const stream of this.streams) {
  156. if (stream.series) {
  157. series.push.apply(series, stream.series);
  158. }
  159. try {
  160. stream.unsubscribe();
  161. } catch {
  162. console.log('Failed to unsubscribe to stream');
  163. }
  164. }
  165. this.streams = [];
  166. // Move the series from streams to the resposne
  167. if (keepSeries) {
  168. const { response } = this;
  169. this.response = {
  170. ...response,
  171. series: [
  172. ...response.series,
  173. ...series, // Append the streamed series
  174. ],
  175. };
  176. }
  177. }
  178. /**
  179. * This is called before broadcasting data to listeners. Given that
  180. * stream events can happen at any point, we need to make sure to
  181. * only return data from active streams.
  182. */
  183. validateStreamsAndGetPanelData(): PanelData {
  184. const { response, streams, request } = this;
  185. // When not streaming, return the response + request
  186. if (!streams.length) {
  187. return {
  188. ...response,
  189. request: request,
  190. };
  191. }
  192. let done = this.isFinished(response.state);
  193. const series = [...response.series];
  194. const active: DataStreamState[] = [];
  195. for (const stream of this.streams) {
  196. if (shouldDisconnect(request, stream)) {
  197. console.log('getPanelData() - shouldDisconnect true, unsubscribing to steam');
  198. stream.unsubscribe();
  199. continue;
  200. }
  201. active.push(stream);
  202. series.push.apply(series, stream.series);
  203. if (!this.isFinished(stream.state)) {
  204. done = false;
  205. }
  206. }
  207. this.streams = active;
  208. // Update the time range
  209. let timeRange = this.request.range;
  210. if (isString(timeRange.raw.from)) {
  211. timeRange = {
  212. from: dateMath.parse(timeRange.raw.from, false),
  213. to: dateMath.parse(timeRange.raw.to, true),
  214. raw: timeRange.raw,
  215. };
  216. }
  217. return {
  218. state: done ? LoadingState.Done : LoadingState.Streaming,
  219. series, // Union of series from response and all streams
  220. legacy: this.sendLegacy ? translateToLegacyData(series) : undefined,
  221. request: {
  222. ...this.request,
  223. range: timeRange, // update the time range
  224. },
  225. };
  226. }
  227. /**
  228. * Make sure all requested formats exist on the data
  229. */
  230. getDataAfterCheckingFormats(): PanelData {
  231. const { response, sendLegacy, sendSeries } = this;
  232. if (sendLegacy && (!response.legacy || !response.legacy.length)) {
  233. response.legacy = response.series.map(v => toLegacyResponseData(v));
  234. }
  235. if (sendSeries && !response.series.length && response.legacy) {
  236. response.series = response.legacy.map(v => toSeriesData(v));
  237. }
  238. return this.validateStreamsAndGetPanelData();
  239. }
  240. setError(err: any): PanelData {
  241. if (!this.request.endTime) {
  242. this.request.endTime = Date.now();
  243. }
  244. this.closeStreams(true);
  245. this.response = {
  246. ...this.response, // Keep any existing data
  247. state: LoadingState.Error,
  248. error: toDataQueryError(err),
  249. };
  250. return this.validateStreamsAndGetPanelData();
  251. }
  252. }
  253. export function shouldDisconnect(source: DataQueryRequest, state: DataStreamState) {
  254. // It came from the same the same request, so keep it
  255. if (source === state.request || state.request.requestId.startsWith(source.requestId)) {
  256. return false;
  257. }
  258. // We should be able to check that it is the same query regardless of
  259. // if it came from the same request. This will be important for #16676
  260. return true;
  261. }
  262. export function toDataQueryError(err: any): DataQueryError {
  263. const error = (err || {}) as DataQueryError;
  264. if (!error.message) {
  265. if (typeof err === 'string' || err instanceof String) {
  266. return { message: err } as DataQueryError;
  267. }
  268. let message = 'Query error';
  269. if (error.message) {
  270. message = error.message;
  271. } else if (error.data && error.data.message) {
  272. message = error.data.message;
  273. } else if (error.data && error.data.error) {
  274. message = error.data.error;
  275. } else if (error.status) {
  276. message = `Query error: ${error.status} ${error.statusText}`;
  277. }
  278. error.message = message;
  279. }
  280. return error;
  281. }
  282. function translateToLegacyData(data: DataQueryResponseData) {
  283. return data.map(v => {
  284. if (isSeriesData(v)) {
  285. return toLegacyResponseData(v);
  286. }
  287. return v;
  288. });
  289. }
  290. /**
  291. * All panels will be passed tables that have our best guess at colum type set
  292. *
  293. * This is also used by PanelChrome for snapshot support
  294. */
  295. export function getProcessedSeriesData(results?: any[]): SeriesData[] {
  296. if (!results) {
  297. return [];
  298. }
  299. const series: SeriesData[] = [];
  300. for (const r of results) {
  301. if (r) {
  302. series.push(guessFieldTypes(toSeriesData(r)));
  303. }
  304. }
  305. return series;
  306. }