runStreams.ts 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. import { defaults } from 'lodash';
  2. import { Observable } from 'rxjs';
  3. import { DataQueryRequest, DataQueryResponse } from '@grafana/ui';
  4. import { FieldType, CircularDataFrame, CSVReader, Field, LoadingState } from '@grafana/data';
  5. import { TestDataQuery, StreamingQuery } from './types';
  6. import { getRandomLine } from './LogIpsum';
  7. export const defaultQuery: StreamingQuery = {
  8. type: 'signal',
  9. speed: 250, // ms
  10. spread: 3.5,
  11. noise: 2.2,
  12. bands: 1,
  13. };
  14. export function runStream(target: TestDataQuery, req: DataQueryRequest<TestDataQuery>): Observable<DataQueryResponse> {
  15. const query = defaults(target.stream, defaultQuery);
  16. if ('signal' === query.type) {
  17. return runSignalStream(target, query, req);
  18. }
  19. if ('logs' === query.type) {
  20. return runLogsStream(target, query, req);
  21. }
  22. if ('fetch' === query.type) {
  23. return runFetchStream(target, query, req);
  24. }
  25. throw new Error(`Unknown Stream Type: ${query.type}`);
  26. }
  27. export function runSignalStream(
  28. target: TestDataQuery,
  29. query: StreamingQuery,
  30. req: DataQueryRequest<TestDataQuery>
  31. ): Observable<DataQueryResponse> {
  32. return new Observable<DataQueryResponse>(subscriber => {
  33. const streamId = `signal-${req.panelId}-${target.refId}`;
  34. const maxDataPoints = req.maxDataPoints || 1000;
  35. const data = new CircularDataFrame({
  36. append: 'tail',
  37. capacity: maxDataPoints,
  38. });
  39. data.refId = target.refId;
  40. data.name = target.alias || 'Signal ' + target.refId;
  41. data.addField({ name: 'time', type: FieldType.time });
  42. data.addField({ name: 'value', type: FieldType.number });
  43. const { spread, speed, bands, noise } = query;
  44. for (let i = 0; i < bands; i++) {
  45. const suffix = bands > 1 ? ` ${i + 1}` : '';
  46. data.addField({ name: 'Min' + suffix, type: FieldType.number });
  47. data.addField({ name: 'Max' + suffix, type: FieldType.number });
  48. }
  49. let value = Math.random() * 100;
  50. let timeoutId: any = null;
  51. const addNextRow = (time: number) => {
  52. value += (Math.random() - 0.5) * spread;
  53. let idx = 0;
  54. data.fields[idx++].values.add(time);
  55. data.fields[idx++].values.add(value);
  56. let min = value;
  57. let max = value;
  58. for (let i = 0; i < bands; i++) {
  59. min = min - Math.random() * noise;
  60. max = max + Math.random() * noise;
  61. data.fields[idx++].values.add(min);
  62. data.fields[idx++].values.add(max);
  63. }
  64. };
  65. // Fill the buffer on init
  66. if (true) {
  67. let time = Date.now() - maxDataPoints * speed;
  68. for (let i = 0; i < maxDataPoints; i++) {
  69. addNextRow(time);
  70. time += speed;
  71. }
  72. }
  73. const pushNextEvent = () => {
  74. addNextRow(Date.now());
  75. subscriber.next({
  76. data: [data],
  77. key: streamId,
  78. });
  79. timeoutId = setTimeout(pushNextEvent, speed);
  80. };
  81. // Send first event in 5ms
  82. setTimeout(pushNextEvent, 5);
  83. return () => {
  84. console.log('unsubscribing to stream ' + streamId);
  85. clearTimeout(timeoutId);
  86. };
  87. });
  88. }
  89. export function runLogsStream(
  90. target: TestDataQuery,
  91. query: StreamingQuery,
  92. req: DataQueryRequest<TestDataQuery>
  93. ): Observable<DataQueryResponse> {
  94. return new Observable<DataQueryResponse>(subscriber => {
  95. const streamId = `logs-${req.panelId}-${target.refId}`;
  96. const maxDataPoints = req.maxDataPoints || 1000;
  97. const data = new CircularDataFrame({
  98. append: 'tail',
  99. capacity: maxDataPoints,
  100. });
  101. data.refId = target.refId;
  102. data.name = target.alias || 'Logs ' + target.refId;
  103. data.addField({ name: 'time', type: FieldType.time });
  104. data.addField({ name: 'line', type: FieldType.string });
  105. const { speed } = query;
  106. let timeoutId: any = null;
  107. const pushNextEvent = () => {
  108. data.values.time.add(Date.now());
  109. data.values.line.add(getRandomLine());
  110. subscriber.next({
  111. data: [data],
  112. key: streamId,
  113. });
  114. timeoutId = setTimeout(pushNextEvent, speed);
  115. };
  116. // Send first event in 5ms
  117. setTimeout(pushNextEvent, 5);
  118. return () => {
  119. console.log('unsubscribing to stream ' + streamId);
  120. clearTimeout(timeoutId);
  121. };
  122. });
  123. }
  124. export function runFetchStream(
  125. target: TestDataQuery,
  126. query: StreamingQuery,
  127. req: DataQueryRequest<TestDataQuery>
  128. ): Observable<DataQueryResponse> {
  129. return new Observable<DataQueryResponse>(subscriber => {
  130. const streamId = `fetch-${req.panelId}-${target.refId}`;
  131. const maxDataPoints = req.maxDataPoints || 1000;
  132. let data = new CircularDataFrame({
  133. append: 'tail',
  134. capacity: maxDataPoints,
  135. });
  136. data.refId = target.refId;
  137. data.name = target.alias || 'Fetch ' + target.refId;
  138. let reader: ReadableStreamReader<Uint8Array>;
  139. const csv = new CSVReader({
  140. callback: {
  141. onHeader: (fields: Field[]) => {
  142. // Clear any existing fields
  143. if (data.fields.length) {
  144. data = new CircularDataFrame({
  145. append: 'tail',
  146. capacity: maxDataPoints,
  147. });
  148. data.refId = target.refId;
  149. data.name = 'Fetch ' + target.refId;
  150. }
  151. for (const field of fields) {
  152. data.addField(field);
  153. }
  154. },
  155. onRow: (row: any[]) => {
  156. data.add(row);
  157. },
  158. },
  159. });
  160. const processChunk = (value: ReadableStreamReadResult<Uint8Array>): any => {
  161. if (value.value) {
  162. const text = new TextDecoder().decode(value.value);
  163. csv.readCSV(text);
  164. }
  165. subscriber.next({
  166. data: [data],
  167. key: streamId,
  168. state: value.done ? LoadingState.Done : LoadingState.Streaming,
  169. });
  170. if (value.done) {
  171. console.log('Finished stream');
  172. subscriber.complete(); // necessary?
  173. return;
  174. }
  175. return reader.read().then(processChunk);
  176. };
  177. fetch(new Request(query.url)).then(response => {
  178. reader = response.body.getReader();
  179. reader.read().then(processChunk);
  180. });
  181. return () => {
  182. // Cancel fetch?
  183. console.log('unsubscribing to stream ' + streamId);
  184. };
  185. });
  186. }