| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- import { Subject, Observable } from 'rxjs';
- import * as rxJsWebSocket from 'rxjs/webSocket';
- import { LiveStreams } from './live_streams';
- import { DataFrameView, Labels, formatLabels, DataFrame } from '@grafana/data';
- import { noop } from 'lodash';
- let fakeSocket: Subject<any>;
- jest.mock('rxjs/webSocket', () => {
- return {
- __esModule: true,
- webSocket: () => fakeSocket,
- };
- });
- describe('Live Stream Tests', () => {
- afterAll(() => {
- jest.restoreAllMocks();
- });
- const msg0: any = {
- streams: [
- {
- labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:50:40.118944705Z',
- line: 'Kittens',
- },
- ],
- },
- ],
- dropped_entries: null,
- };
- it('reads the values into the buffer', done => {
- fakeSocket = new Subject<any>();
- const labels: Labels = { job: 'varlogs' };
- const target = makeTarget('fake', labels);
- const stream = new LiveStreams().getStream(target);
- expect.assertions(5);
- const tests = [
- (val: DataFrame[]) => expect(val).toEqual([]),
- (val: DataFrame[]) => {
- expect(val[0].length).toEqual(7);
- expect(val[0].labels).toEqual(labels);
- },
- (val: DataFrame[]) => {
- expect(val[0].length).toEqual(8);
- const view = new DataFrameView(val[0]);
- const last = { ...view.get(view.length - 1) };
- expect(last).toEqual({
- ts: '2019-08-28T20:50:40.118944705Z',
- line: 'Kittens',
- labels: { filename: '/var/log/sntpc.log' },
- });
- },
- ];
- stream.subscribe({
- next: val => {
- const test = tests.shift();
- test(val);
- },
- complete: () => done(),
- });
- // Send it the initial list of things
- fakeSocket.next(initialRawResponse);
- // Send it a single update
- fakeSocket.next(msg0);
- fakeSocket.complete();
- });
- it('returns the same subscription if the url matches existing one', () => {
- fakeSocket = new Subject<any>();
- const liveStreams = new LiveStreams();
- const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
- const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
- expect(stream1).toBe(stream2);
- });
- it('returns new subscription when the previous unsubscribed', () => {
- fakeSocket = new Subject<any>();
- const liveStreams = new LiveStreams();
- const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
- const subscription = stream1.subscribe({
- next: noop,
- });
- subscription.unsubscribe();
- const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
- expect(stream1).not.toBe(stream2);
- });
- it('returns new subscription when the previous is unsubscribed and correctly unsubscribes from source', () => {
- let unsubscribed = false;
- fakeSocket = new Observable(() => {
- return () => (unsubscribed = true);
- }) as any;
- const spy = spyOn(rxJsWebSocket, 'webSocket');
- spy.and.returnValue(fakeSocket);
- const liveStreams = new LiveStreams();
- const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
- const subscription = stream1.subscribe({
- next: noop,
- });
- subscription.unsubscribe();
- expect(unsubscribed).toBe(true);
- });
- });
- /**
- * Create target (query to run). Url is what is used as cache key.
- */
- function makeTarget(url: string, labels?: Labels) {
- labels = labels || { job: 'varlogs' };
- return {
- url,
- size: 10,
- query: formatLabels(labels),
- refId: 'A',
- regexp: '',
- };
- }
- //----------------------------------------------------------------
- // Added this at the end so the top is more readable
- //----------------------------------------------------------------
- const initialRawResponse: any = {
- streams: [
- {
- labels: '{filename="/var/log/docker.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:43:38.215447855Z',
- line:
- '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147149490Z" ' +
- 'level=debug msg="[resolver] received AAAA record \\"::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
- },
- ],
- },
- {
- labels: '{filename="/var/log/docker.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:43:38.215450388Z',
- line:
- '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147224630Z" ' +
- 'level=debug msg="[resolver] received AAAA record \\"fe80::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
- },
- ],
- },
- {
- labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:43:40.452525099Z',
- line: '2019-08-28T20:43:40Z sntpc sntpc[1]: offset=-0.022171, delay=0.000463',
- },
- ],
- },
- {
- labels: '{filename="/var/log/sntpc.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:44:10.297164454Z',
- line: '2019-08-28T20:44:10Z sntpc sntpc[1]: offset=-0.022327, delay=0.000527',
- },
- ],
- },
- {
- labels: '{filename="/var/log/lifecycle-server.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:44:38.152248647Z',
- line:
- '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095444834Z" ' +
- 'level=debug msg="Name To resolve: localhost."',
- },
- ],
- },
- {
- labels: '{filename="/var/log/lifecycle-server.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:44:38.15225554Z',
- line:
- '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095896074Z" ' +
- 'level=debug msg="[resolver] query localhost. (A) from 172.22.0.4:53748, forwarding to udp:192.168.65.1"',
- },
- ],
- },
- {
- labels: '{filename="/var/log/docker.log", job="varlogs"}',
- entries: [
- {
- ts: '2019-08-28T20:44:38.152271475Z',
- line:
- '2019-08-28T20:44:38Z docker time="2019-08-28T20:44:38.095444834Z" level=debug msg="Name To resolve: localhost."',
- },
- ],
- },
- ],
- dropped_entries: null,
- };
|