live_streams.test.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import { Subject, Observable } from 'rxjs';
  2. import * as rxJsWebSocket from 'rxjs/webSocket';
  3. import { LiveStreams } from './live_streams';
  4. import { DataFrameView, Labels, formatLabels, DataFrame } from '@grafana/data';
  5. import { noop } from 'lodash';
  6. let fakeSocket: Subject<any>;
  7. jest.mock('rxjs/webSocket', () => {
  8. return {
  9. __esModule: true,
  10. webSocket: () => fakeSocket,
  11. };
  12. });
  13. describe('Live Stream Tests', () => {
  14. afterAll(() => {
  15. jest.restoreAllMocks();
  16. });
  17. const msg0: any = {
  18. streams: [
  19. {
  20. labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
  21. entries: [
  22. {
  23. ts: '2019-08-28T20:50:40.118944705Z',
  24. line: 'Kittens',
  25. },
  26. ],
  27. },
  28. ],
  29. dropped_entries: null,
  30. };
  31. it('reads the values into the buffer', done => {
  32. fakeSocket = new Subject<any>();
  33. const labels: Labels = { job: 'varlogs' };
  34. const target = makeTarget('fake', labels);
  35. const stream = new LiveStreams().getStream(target);
  36. expect.assertions(5);
  37. const tests = [
  38. (val: DataFrame[]) => expect(val).toEqual([]),
  39. (val: DataFrame[]) => {
  40. expect(val[0].length).toEqual(7);
  41. expect(val[0].labels).toEqual(labels);
  42. },
  43. (val: DataFrame[]) => {
  44. expect(val[0].length).toEqual(8);
  45. const view = new DataFrameView(val[0]);
  46. const last = { ...view.get(view.length - 1) };
  47. expect(last).toEqual({
  48. ts: '2019-08-28T20:50:40.118944705Z',
  49. line: 'Kittens',
  50. labels: { filename: '/var/log/sntpc.log' },
  51. });
  52. },
  53. ];
  54. stream.subscribe({
  55. next: val => {
  56. const test = tests.shift();
  57. test(val);
  58. },
  59. complete: () => done(),
  60. });
  61. // Send it the initial list of things
  62. fakeSocket.next(initialRawResponse);
  63. // Send it a single update
  64. fakeSocket.next(msg0);
  65. fakeSocket.complete();
  66. });
  67. it('returns the same subscription if the url matches existing one', () => {
  68. fakeSocket = new Subject<any>();
  69. const liveStreams = new LiveStreams();
  70. const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
  71. const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
  72. expect(stream1).toBe(stream2);
  73. });
  74. it('returns new subscription when the previous unsubscribed', () => {
  75. fakeSocket = new Subject<any>();
  76. const liveStreams = new LiveStreams();
  77. const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
  78. const subscription = stream1.subscribe({
  79. next: noop,
  80. });
  81. subscription.unsubscribe();
  82. const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
  83. expect(stream1).not.toBe(stream2);
  84. });
  85. it('returns new subscription when the previous is unsubscribed and correctly unsubscribes from source', () => {
  86. let unsubscribed = false;
  87. fakeSocket = new Observable(() => {
  88. return () => (unsubscribed = true);
  89. }) as any;
  90. const spy = spyOn(rxJsWebSocket, 'webSocket');
  91. spy.and.returnValue(fakeSocket);
  92. const liveStreams = new LiveStreams();
  93. const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
  94. const subscription = stream1.subscribe({
  95. next: noop,
  96. });
  97. subscription.unsubscribe();
  98. expect(unsubscribed).toBe(true);
  99. });
  100. });
  101. /**
  102. * Create target (query to run). Url is what is used as cache key.
  103. */
  104. function makeTarget(url: string, labels?: Labels) {
  105. labels = labels || { job: 'varlogs' };
  106. return {
  107. url,
  108. size: 10,
  109. query: formatLabels(labels),
  110. refId: 'A',
  111. regexp: '',
  112. };
  113. }
  114. //----------------------------------------------------------------
  115. // Added this at the end so the top is more readable
  116. //----------------------------------------------------------------
  117. const initialRawResponse: any = {
  118. streams: [
  119. {
  120. labels: '{filename="/var/log/docker.log", job="varlogs"}',
  121. entries: [
  122. {
  123. ts: '2019-08-28T20:43:38.215447855Z',
  124. line:
  125. '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147149490Z" ' +
  126. 'level=debug msg="[resolver] received AAAA record \\"::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
  127. },
  128. ],
  129. },
  130. {
  131. labels: '{filename="/var/log/docker.log", job="varlogs"}',
  132. entries: [
  133. {
  134. ts: '2019-08-28T20:43:38.215450388Z',
  135. line:
  136. '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147224630Z" ' +
  137. 'level=debug msg="[resolver] received AAAA record \\"fe80::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
  138. },
  139. ],
  140. },
  141. {
  142. labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
  143. entries: [
  144. {
  145. ts: '2019-08-28T20:43:40.452525099Z',
  146. line: '2019-08-28T20:43:40Z sntpc sntpc[1]: offset=-0.022171, delay=0.000463',
  147. },
  148. ],
  149. },
  150. {
  151. labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
  152. entries: [
  153. {
  154. ts: '2019-08-28T20:44:10.297164454Z',
  155. line: '2019-08-28T20:44:10Z sntpc sntpc[1]: offset=-0.022327, delay=0.000527',
  156. },
  157. ],
  158. },
  159. {
  160. labels: '{filename="/var/log/lifecycle-server.log", job="varlogs"}',
  161. entries: [
  162. {
  163. ts: '2019-08-28T20:44:38.152248647Z',
  164. line:
  165. '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095444834Z" ' +
  166. 'level=debug msg="Name To resolve: localhost."',
  167. },
  168. ],
  169. },
  170. {
  171. labels: '{filename="/var/log/lifecycle-server.log", job="varlogs"}',
  172. entries: [
  173. {
  174. ts: '2019-08-28T20:44:38.15225554Z',
  175. line:
  176. '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095896074Z" ' +
  177. 'level=debug msg="[resolver] query localhost. (A) from 172.22.0.4:53748, forwarding to udp:192.168.65.1"',
  178. },
  179. ],
  180. },
  181. {
  182. labels: '{filename="/var/log/docker.log", job="varlogs"}',
  183. entries: [
  184. {
  185. ts: '2019-08-28T20:44:38.152271475Z',
  186. line:
  187. '2019-08-28T20:44:38Z docker time="2019-08-28T20:44:38.095444834Z" level=debug msg="Name To resolve: localhost."',
  188. },
  189. ],
  190. },
  191. ],
  192. dropped_entries: null,
  193. };