فهرست منبع

feat(websocket): reconnection and resubscription handling, #4355

Torkel Ödegaard 9 سال پیش
والد
کامیت
92f20b9b7d
3فایلهای تغییر یافته به همراه76 افزوده شده و 12 حذف شده
  1. 1 1
      package.json
  2. 70 10
      public/app/core/live/live_srv.ts
  3. 5 1
      public/app/plugins/datasource/grafana-live/datasource.ts

+ 1 - 1
package.json

@@ -53,7 +53,7 @@
     "mocha": "2.3.4",
     "phantomjs": "^1.9.19",
     "reflect-metadata": "0.1.2",
-    "rxjs": "5.0.0-beta.0",
+    "rxjs": "5.0.0-beta.2",
     "sass-lint": "^1.5.0",
     "systemjs": "0.19.20",
     "zone.js": "0.5.10"

+ 70 - 10
public/app/core/live/live_srv.ts

@@ -1,55 +1,115 @@
 ///<reference path="../../headers/common.d.ts" />
 
+import _ from 'lodash';
 import config from 'app/core/config';
 import coreModule from 'app/core/core_module';
 
+import {Observable} from 'vendor/npm/rxjs/Observable';
+
 export class LiveSrv {
   conn: any;
+  observers: any;
   initPromise: any;
 
+  constructor() {
+    this.observers = {};
+  }
+
   getWebSocketUrl() {
     var l = window.location;
     return ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + config.appSubUrl + '/ws';
   }
 
-  init() {
+  getConnection() {
     if (this.initPromise) {
       return this.initPromise;
     }
 
     if (this.conn && this.conn.readyState === 1) {
-      return Promise.resolve();
+      return Promise.resolve(this.conn);
     }
 
     this.initPromise = new Promise((resolve, reject) => {
       console.log('Live: connecting...');
       this.conn = new WebSocket(this.getWebSocketUrl());
 
-      this.conn.onclose = function(evt) {
+      this.conn.onclose = (evt) => {
+        console.log("Live: websocket onclose", evt);
         reject({message: 'Connection closed'});
+
+        this.initPromise = null;
+        setTimeout(this.reconnect.bind(this), 2000);
       };
 
-      this.conn.onmessage = function(evt) {
+      this.conn.onmessage = (evt) => {
         console.log("Live: message received:", evt.data);
       };
 
-      this.conn.onopen = function(evt) {
-        console.log('Live: connection open');
-        resolve();
+      this.conn.onerror = (evt) => {
+        this.initPromise = null;
+        reject({message: 'Connection error'});
+        console.log("Live: websocket error", evt);
+      };
+
+      this.conn.onopen = (evt) => {
+        console.log('opened');
+        this.initPromise = null;
+        resolve(this.conn);
       };
     });
 
     return this.initPromise;
   }
 
+  reconnect() {
+    // no need to reconnect if no one cares
+    if (_.keys(this.observers).length === 0) {
+      return;
+    }
+
+    console.log('LiveSrv: Reconnecting');
+
+    this.getConnection().then(conn => {
+      _.each(this.observers, (value, key) => {
+        this.send({action: 'subscribe', stream: key});
+      });
+    });
+  }
+
   send(data) {
     this.conn.send(JSON.stringify(data));
   }
 
-  subscribe(name) {
-    return this.init().then(() =>  {
-      this.send({action: 'subscribe', stream: name});
+  addObserver(stream, observer) {
+    this.observers[stream] = observer;
+
+    this.getConnection().then(conn => {
+      this.send({action: 'subscribe', stream: stream});
+    });
+  }
+
+  removeObserver(stream, observer) {
+    delete this.observers[stream];
+
+    this.getConnection().then(conn => {
+      this.send({action: 'unsubscribe', stream: stream});
+    });
+  }
+
+  subscribe(streamName) {
+    console.log('LiveSrv.subscribe: ' + streamName);
+
+    return Observable.create(observer => {
+      this.addObserver(streamName, observer);
+
+      return () => {
+        this.removeObserver(streamName, observer);
+      };
     });
+
+    // return this.init().then(() =>  {
+    //   this.send({action: 'subscribe', stream: name});
+    // });
   }
 
 }

+ 5 - 1
public/app/plugins/datasource/grafana-live/datasource.ts

@@ -3,6 +3,7 @@
 import {liveSrv} from 'app/core/core';
 
 export class GrafanaStreamDS {
+  subscription: any;
 
   /** @ngInject */
   constructor(private $q) {
@@ -15,7 +16,10 @@ export class GrafanaStreamDS {
     }
 
     var target = options.targets[0];
-    liveSrv.subscribe(target.stream);
+    var observable = liveSrv.subscribe(target.stream);
+    this.subscription = observable.subscribe(data => {
+      console.log("grafana stream ds data!", data);
+    });
 
     return Promise.resolve({data: []});
   }