live_streams.ts 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import { DataFrame, FieldType, parseLabels, KeyValue, CircularDataFrame } from '@grafana/data';
  2. import { Observable, BehaviorSubject } from 'rxjs';
  3. import { webSocket } from 'rxjs/webSocket';
  4. import { LokiResponse } from './types';
  5. import { finalize, map, multicast, refCount } from 'rxjs/operators';
  6. import { appendResponseToBufferedData } from './result_transformer';
  7. /**
  8. * Maps directly to a query in the UI (refId is key)
  9. */
  10. export interface LiveTarget {
  11. query: string;
  12. regexp: string;
  13. url: string;
  14. refId: string;
  15. size: number;
  16. }
  17. /**
  18. * Cache of websocket streams that can be returned as observable. In case there already is a stream for particular
  19. * target it is returned and on subscription returns the latest dataFrame.
  20. */
  21. export class LiveStreams {
  22. private streams: KeyValue<Observable<DataFrame[]>> = {};
  23. getStream(target: LiveTarget): Observable<DataFrame[]> {
  24. let stream = this.streams[target.url];
  25. if (!stream) {
  26. const data = new CircularDataFrame({ capacity: target.size });
  27. data.labels = parseLabels(target.query);
  28. data.addField({ name: 'ts', type: FieldType.time, config: { title: 'Time' } });
  29. data.addField({ name: 'line', type: FieldType.string });
  30. data.addField({ name: 'labels', type: FieldType.other });
  31. const subject = new BehaviorSubject<DataFrame[]>([]);
  32. stream = webSocket(target.url).pipe(
  33. finalize(() => {
  34. delete this.streams[target.url];
  35. }),
  36. map((response: LokiResponse) => {
  37. appendResponseToBufferedData(response, data);
  38. return [data];
  39. }),
  40. multicast(subject),
  41. refCount()
  42. );
  43. this.streams[target.url] = stream;
  44. }
  45. return stream;
  46. }
  47. }