runRequest.ts 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Libraries
  2. import { Observable, of, timer, merge, from } from 'rxjs';
  3. import { flatten, map as lodashMap, isArray, isString } from 'lodash';
  4. import { map, catchError, takeUntil, mapTo, share, finalize } from 'rxjs/operators';
  5. // Utils & Services
  6. import { getBackendSrv } from 'app/core/services/backend_srv';
  7. // Types
  8. import {
  9. DataSourceApi,
  10. DataQueryRequest,
  11. PanelData,
  12. DataQueryResponse,
  13. DataQueryResponseData,
  14. DataQueryError,
  15. } from '@grafana/ui';
  16. import { LoadingState, dateMath, toDataFrame, DataFrame, guessFieldTypes } from '@grafana/data';
  17. type MapOfResponsePackets = { [str: string]: DataQueryResponse };
  18. interface RunningQueryState {
  19. packets: { [key: string]: DataQueryResponse };
  20. panelData: PanelData;
  21. }
  22. /*
  23. * This function should handle composing a PanelData from multiple responses
  24. */
  25. export function processResponsePacket(packet: DataQueryResponse, state: RunningQueryState): RunningQueryState {
  26. const request = state.panelData.request;
  27. const packets: MapOfResponsePackets = {
  28. ...state.packets,
  29. };
  30. packets[packet.key || 'A'] = packet;
  31. // Update the time range
  32. let timeRange = request.range;
  33. if (isString(timeRange.raw.from)) {
  34. timeRange = {
  35. from: dateMath.parse(timeRange.raw.from, false),
  36. to: dateMath.parse(timeRange.raw.to, true),
  37. raw: timeRange.raw,
  38. };
  39. }
  40. const combinedData = flatten(
  41. lodashMap(packets, (packet: DataQueryResponse) => {
  42. return packet.data;
  43. })
  44. );
  45. const panelData = {
  46. state: packet.state || LoadingState.Done,
  47. series: combinedData,
  48. request: {
  49. ...request,
  50. range: timeRange,
  51. },
  52. };
  53. return { packets, panelData };
  54. }
  55. /**
  56. * This function handles the excecution of requests & and processes the single or multiple response packets into
  57. * a combined PanelData response.
  58. * It will
  59. * * Merge multiple responses into a single DataFrame array based on the packet key
  60. * * Will emit a loading state if no response after 50ms
  61. * * Cancel any still runnning network requests on unsubscribe (using request.requestId)
  62. */
  63. export function runRequest(datasource: DataSourceApi, request: DataQueryRequest): Observable<PanelData> {
  64. let state: RunningQueryState = {
  65. panelData: {
  66. state: LoadingState.Loading,
  67. series: [],
  68. request: request,
  69. },
  70. packets: {},
  71. };
  72. // Return early if there are no queries to run
  73. if (!request.targets.length) {
  74. request.endTime = Date.now();
  75. state.panelData.state = LoadingState.Done;
  76. return of(state.panelData);
  77. }
  78. const dataObservable = callQueryMethod(datasource, request).pipe(
  79. // Transform response packets into PanelData with merged results
  80. map((packet: DataQueryResponse) => {
  81. if (!isArray(packet.data)) {
  82. throw new Error(`Expected response data to be array, got ${typeof packet.data}.`);
  83. }
  84. request.endTime = Date.now();
  85. state = processResponsePacket(packet, state);
  86. return state.panelData;
  87. }),
  88. // handle errors
  89. catchError(err =>
  90. of({
  91. ...state.panelData,
  92. state: LoadingState.Error,
  93. error: processQueryError(err),
  94. })
  95. ),
  96. // finalize is triggered when subscriber unsubscribes
  97. // This makes sure any still running network requests are cancelled
  98. finalize(cancelNetworkRequestsOnUnsubscribe(request)),
  99. // this makes it possible to share this observable in takeUntil
  100. share()
  101. );
  102. // If 50ms without a response emit a loading state
  103. // mapTo will translate the timer event into state.panelData (which has state set to loading)
  104. // takeUntil will cancel the timer emit when first response packet is received on the dataObservable
  105. return merge(
  106. timer(200).pipe(
  107. mapTo(state.panelData),
  108. takeUntil(dataObservable)
  109. ),
  110. dataObservable
  111. );
  112. }
  113. function cancelNetworkRequestsOnUnsubscribe(req: DataQueryRequest) {
  114. return () => {
  115. getBackendSrv().resolveCancelerIfExists(req.requestId);
  116. };
  117. }
  118. export function callQueryMethod(datasource: DataSourceApi, request: DataQueryRequest) {
  119. const returnVal = datasource.query(request);
  120. return from(returnVal);
  121. }
  122. export function processQueryError(err: any): DataQueryError {
  123. const error = (err || {}) as DataQueryError;
  124. if (!error.message) {
  125. if (typeof err === 'string' || err instanceof String) {
  126. return { message: err } as DataQueryError;
  127. }
  128. let message = 'Query error';
  129. if (error.message) {
  130. message = error.message;
  131. } else if (error.data && error.data.message) {
  132. message = error.data.message;
  133. } else if (error.data && error.data.error) {
  134. message = error.data.error;
  135. } else if (error.status) {
  136. message = `Query error: ${error.status} ${error.statusText}`;
  137. }
  138. error.message = message;
  139. }
  140. return error;
  141. }
  142. /**
  143. * All panels will be passed tables that have our best guess at colum type set
  144. *
  145. * This is also used by PanelChrome for snapshot support
  146. */
  147. export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataFrame[] {
  148. if (!isArray(results)) {
  149. return [];
  150. }
  151. const dataFrames: DataFrame[] = [];
  152. for (const result of results) {
  153. const dataFrame = guessFieldTypes(toDataFrame(result));
  154. // clear out any cached calcs
  155. for (const field of dataFrame.fields) {
  156. field.calcs = null;
  157. }
  158. dataFrames.push(dataFrame);
  159. }
  160. return dataFrames;
  161. }
  162. export function preProcessPanelData(data: PanelData, lastResult: PanelData): PanelData {
  163. const { series } = data;
  164. // for loading states with no data, use last result
  165. if (data.state === LoadingState.Loading && series.length === 0) {
  166. if (!lastResult) {
  167. lastResult = data;
  168. }
  169. return { ...lastResult, state: LoadingState.Loading };
  170. }
  171. // Make sure the data frames are properly formatted
  172. return {
  173. ...data,
  174. series: getProcessedDataFrames(series),
  175. };
  176. }