PanelQueryState.ts 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. // Libraries
  2. import isString from 'lodash/isString';
  3. import isEqual from 'lodash/isEqual';
  4. // Utils & Services
  5. import { getBackendSrv } from 'app/core/services/backend_srv';
  6. import { dateMath } from '@grafana/data';
  7. import {
  8. guessFieldTypes,
  9. LoadingState,
  10. toLegacyResponseData,
  11. DataFrame,
  12. toDataFrame,
  13. isDataFrame,
  14. } from '@grafana/data';
  15. // Types
  16. import {
  17. DataSourceApi,
  18. DataQueryRequest,
  19. PanelData,
  20. DataQueryError,
  21. DataStreamObserver,
  22. DataStreamState,
  23. DataQueryResponseData,
  24. } from '@grafana/ui';
  25. export class PanelQueryState {
  26. // The current/last running request
  27. request = {
  28. startTime: 0,
  29. endTime: 1000, // Somethign not zero
  30. } as DataQueryRequest;
  31. // The result back from the datasource query
  32. response = {
  33. state: LoadingState.NotStarted,
  34. series: [],
  35. } as PanelData;
  36. // Active stream results
  37. streams: DataStreamState[] = [];
  38. sendFrames = false;
  39. sendLegacy = false;
  40. // A promise for the running query
  41. private executor?: Promise<PanelData> = null;
  42. private rejector = (reason?: any) => {};
  43. private datasource: DataSourceApi = {} as any;
  44. isFinished(state: LoadingState) {
  45. return state === LoadingState.Done || state === LoadingState.Error;
  46. }
  47. isStarted() {
  48. return this.response.state !== LoadingState.NotStarted;
  49. }
  50. isSameQuery(ds: DataSourceApi, req: DataQueryRequest) {
  51. if (ds !== this.datasource) {
  52. return false;
  53. }
  54. // For now just check that the targets look the same
  55. return isEqual(this.request.targets, req.targets);
  56. }
  57. /**
  58. * Return the currently running query
  59. */
  60. getActiveRunner(): Promise<PanelData> | undefined {
  61. return this.executor;
  62. }
  63. cancel(reason: string) {
  64. const { request } = this;
  65. this.executor = null;
  66. try {
  67. // If no endTime the call to datasource.query did not complete
  68. // call rejector to reject the executor promise
  69. if (!request.endTime) {
  70. request.endTime = Date.now();
  71. this.rejector('Canceled:' + reason);
  72. }
  73. // Cancel any open HTTP request with the same ID
  74. if (request.requestId) {
  75. getBackendSrv().resolveCancelerIfExists(request.requestId);
  76. }
  77. } catch (err) {
  78. console.log('Error canceling request', err);
  79. }
  80. // Close any open streams
  81. this.closeStreams(true);
  82. }
  83. execute(ds: DataSourceApi, req: DataQueryRequest): Promise<PanelData> {
  84. this.request = req;
  85. this.datasource = ds;
  86. // Return early if there are no queries to run
  87. if (!req.targets.length) {
  88. console.log('No queries, so return early');
  89. this.request.endTime = Date.now();
  90. this.closeStreams();
  91. return Promise.resolve(
  92. (this.response = {
  93. state: LoadingState.Done,
  94. series: [], // Clear the data
  95. legacy: [],
  96. })
  97. );
  98. }
  99. // Set the loading state immediatly
  100. this.response.state = LoadingState.Loading;
  101. this.executor = new Promise<PanelData>((resolve, reject) => {
  102. this.rejector = reject;
  103. return ds
  104. .query(this.request, this.dataStreamObserver)
  105. .then(resp => {
  106. this.request.endTime = Date.now();
  107. this.executor = null;
  108. // Make sure we send something back -- called run() w/o subscribe!
  109. if (!(this.sendFrames || this.sendLegacy)) {
  110. this.sendFrames = true;
  111. }
  112. // Save the result state
  113. this.response = {
  114. state: LoadingState.Done,
  115. request: this.request,
  116. series: this.sendFrames ? getProcessedDataFrames(resp.data) : [],
  117. legacy: this.sendLegacy ? translateToLegacyData(resp.data) : undefined,
  118. };
  119. resolve(this.validateStreamsAndGetPanelData());
  120. })
  121. .catch(err => {
  122. this.executor = null;
  123. resolve(this.setError(err));
  124. });
  125. });
  126. return this.executor;
  127. }
  128. // Send a notice when the stream has updated the current model
  129. onStreamingDataUpdated: () => void;
  130. // This gets all stream events and keeps track of them
  131. // it will then delegate real changes to the PanelQueryRunner
  132. dataStreamObserver: DataStreamObserver = (stream: DataStreamState) => {
  133. // Streams only work with the 'series' format
  134. this.sendFrames = true;
  135. // Add the stream to our list
  136. let found = false;
  137. const active = this.streams.map(s => {
  138. if (s.key === stream.key) {
  139. found = true;
  140. return stream;
  141. }
  142. return s;
  143. });
  144. if (!found) {
  145. if (shouldDisconnect(this.request, stream)) {
  146. console.log('Got stream update from old stream, unsubscribing');
  147. stream.unsubscribe();
  148. return;
  149. }
  150. active.push(stream);
  151. }
  152. this.streams = active;
  153. this.onStreamingDataUpdated();
  154. };
  155. closeStreams(keepSeries = false) {
  156. if (!this.streams.length) {
  157. return;
  158. }
  159. const series: DataFrame[] = [];
  160. for (const stream of this.streams) {
  161. if (stream.data) {
  162. series.push.apply(series, stream.data);
  163. }
  164. try {
  165. stream.unsubscribe();
  166. } catch {
  167. console.log('Failed to unsubscribe to stream');
  168. }
  169. }
  170. this.streams = [];
  171. // Move the series from streams to the resposne
  172. if (keepSeries) {
  173. const { response } = this;
  174. this.response = {
  175. ...response,
  176. series: [
  177. ...response.series,
  178. ...series, // Append the streamed series
  179. ],
  180. };
  181. }
  182. }
  183. /**
  184. * This is called before broadcasting data to listeners. Given that
  185. * stream events can happen at any point, we need to make sure to
  186. * only return data from active streams.
  187. */
  188. validateStreamsAndGetPanelData(): PanelData {
  189. const { response, streams, request } = this;
  190. // When not streaming, return the response + request
  191. if (!streams.length) {
  192. return {
  193. ...response,
  194. request: request,
  195. };
  196. }
  197. let done = this.isFinished(response.state);
  198. const series = [...response.series];
  199. const active: DataStreamState[] = [];
  200. for (const stream of this.streams) {
  201. if (shouldDisconnect(request, stream)) {
  202. console.log('getPanelData() - shouldDisconnect true, unsubscribing to steam');
  203. stream.unsubscribe();
  204. continue;
  205. }
  206. active.push(stream);
  207. series.push.apply(series, stream.data);
  208. if (!this.isFinished(stream.state)) {
  209. done = false;
  210. }
  211. }
  212. this.streams = active;
  213. // Update the time range
  214. let timeRange = this.request.range;
  215. if (isString(timeRange.raw.from)) {
  216. timeRange = {
  217. from: dateMath.parse(timeRange.raw.from, false),
  218. to: dateMath.parse(timeRange.raw.to, true),
  219. raw: timeRange.raw,
  220. };
  221. }
  222. return {
  223. state: done ? LoadingState.Done : LoadingState.Streaming,
  224. series, // Union of series from response and all streams
  225. legacy: this.sendLegacy ? translateToLegacyData(series) : undefined,
  226. request: {
  227. ...this.request,
  228. range: timeRange, // update the time range
  229. },
  230. };
  231. }
  232. /**
  233. * Make sure all requested formats exist on the data
  234. */
  235. getDataAfterCheckingFormats(): PanelData {
  236. const { response, sendLegacy, sendFrames } = this;
  237. if (sendLegacy && (!response.legacy || !response.legacy.length)) {
  238. response.legacy = response.series.map(v => toLegacyResponseData(v));
  239. }
  240. if (sendFrames && !response.series.length && response.legacy) {
  241. response.series = response.legacy.map(v => toDataFrame(v));
  242. }
  243. return this.validateStreamsAndGetPanelData();
  244. }
  245. setError(err: any): PanelData {
  246. if (!this.request.endTime) {
  247. this.request.endTime = Date.now();
  248. }
  249. this.closeStreams(true);
  250. this.response = {
  251. ...this.response, // Keep any existing data
  252. state: LoadingState.Error,
  253. error: toDataQueryError(err),
  254. };
  255. return this.validateStreamsAndGetPanelData();
  256. }
  257. }
  258. export function shouldDisconnect(source: DataQueryRequest, state: DataStreamState) {
  259. // It came from the same the same request, so keep it
  260. if (source === state.request || state.request.requestId.startsWith(source.requestId)) {
  261. return false;
  262. }
  263. // We should be able to check that it is the same query regardless of
  264. // if it came from the same request. This will be important for #16676
  265. return true;
  266. }
  267. export function toDataQueryError(err: any): DataQueryError {
  268. const error = (err || {}) as DataQueryError;
  269. if (!error.message) {
  270. if (typeof err === 'string' || err instanceof String) {
  271. return { message: err } as DataQueryError;
  272. }
  273. let message = 'Query error';
  274. if (error.message) {
  275. message = error.message;
  276. } else if (error.data && error.data.message) {
  277. message = error.data.message;
  278. } else if (error.data && error.data.error) {
  279. message = error.data.error;
  280. } else if (error.status) {
  281. message = `Query error: ${error.status} ${error.statusText}`;
  282. }
  283. error.message = message;
  284. }
  285. return error;
  286. }
  287. function translateToLegacyData(data: DataQueryResponseData) {
  288. return data.map((v: any) => {
  289. if (isDataFrame(v)) {
  290. return toLegacyResponseData(v);
  291. }
  292. return v;
  293. });
  294. }
  295. /**
  296. * All panels will be passed tables that have our best guess at colum type set
  297. *
  298. * This is also used by PanelChrome for snapshot support
  299. */
  300. export function getProcessedDataFrames(results?: any[]): DataFrame[] {
  301. if (!results) {
  302. return [];
  303. }
  304. const series: DataFrame[] = [];
  305. for (const r of results) {
  306. if (r) {
  307. series.push(guessFieldTypes(toDataFrame(r)));
  308. }
  309. }
  310. return series;
  311. }