浏览代码

feat(live): work on websocket data source, #3455

Torkel Ödegaard 9 年之前
父节点
当前提交
2adc4d12be

+ 38 - 4
pkg/api/live/conn.go

@@ -5,6 +5,7 @@ import (
 	"time"
 
 	"github.com/gorilla/websocket"
+	"github.com/grafana/grafana/pkg/components/simplejson"
 	"github.com/grafana/grafana/pkg/log"
 )
 
@@ -25,11 +26,27 @@ const (
 var upgrader = websocket.Upgrader{
 	ReadBufferSize:  1024,
 	WriteBufferSize: 1024,
+	CheckOrigin: func(r *http.Request) bool {
+		return true
+	},
+}
+
+type subscription struct {
+	name string
 }
 
 type connection struct {
-	ws   *websocket.Conn
-	send chan []byte
+	ws      *websocket.Conn
+	streams []*subscription
+	send    chan []byte
+}
+
+func newConnection(ws *websocket.Conn) *connection {
+	return &connection{
+		send:    make(chan []byte, 256),
+		streams: make([]*subscription, 0),
+		ws:      ws,
+	}
 }
 
 func (c *connection) readPump() {
@@ -48,7 +65,24 @@ func (c *connection) readPump() {
 			}
 			break
 		}
-		h.broadcast <- message
+
+		c.handleMessage(message)
+	}
+}
+
+func (c *connection) handleMessage(message []byte) {
+	json, err := simplejson.NewJson(message)
+	if err != nil {
+		log.Error(3, "Unreadable message on websocket channel:", err)
+	}
+
+	msgType := json.Get("action").MustString()
+	streamName := json.Get("stream").MustString()
+
+	switch msgType {
+	case "subscribe":
+		c.streams = append(c.streams, &subscription{name: streamName})
+		log.Info("Live: subscribing to stream %v", streamName)
 	}
 }
 
@@ -98,7 +132,7 @@ func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
 		log.Error(3, "Live: Failed to upgrade connection to WebSocket", err)
 		return
 	}
-	c := &connection{send: make(chan []byte, 256), ws: ws}
+	c := newConnection(ws)
 	h.register <- c
 	go c.writePump()
 	c.readPump()

+ 1 - 0
pkg/api/live/hub.go

@@ -28,6 +28,7 @@ func (h *hub) run() {
 		select {
 		case c := <-h.register:
 			h.connections[c] = true
+			log.Info("Live: New connection (Total count: %v)", len(h.connections))
 		case c := <-h.unregister:
 			if _, ok := h.connections[c]; ok {
 				delete(h.connections, c)

+ 5 - 1
public/app/core/directives/plugin_component.ts

@@ -149,7 +149,11 @@ function pluginDirectiveLoader($compile, datasourceSrv, $rootScope, $q, $http, $
       // ConfigCtrl
       case 'datasource-config-ctrl': {
         var dsMeta = scope.ctrl.datasourceMeta;
-        return System.import(dsMeta.module).then(function(dsModule) {
+        return System.import(dsMeta.module).then(function(dsModule): any {
+          if (!dsMeta.ConfigCtrl) {
+            return {notFound: true};
+          }
+
           return {
             baseUrl: dsMeta.baseUrl,
             name: 'ds-config-' + dsMeta.id,

+ 40 - 13
public/app/core/live/live_srv.ts

@@ -5,24 +5,51 @@ import coreModule from 'app/core/core_module';
 
 export class LiveSrv {
   conn: any;
+  initPromise: any;
+
+  getWebSocketUrl() {
+    var l = window.location;
+    return ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + config.appSubUrl + '/ws';
+  }
 
   init() {
-    this.conn = new WebSocket("ws://localhost:3000/ws");
-    this.conn.onclose = function(evt) {
-      console.log("WebSocket closed");
-    };
-    this.conn.onmessage = function(evt) {
-      console.log("WebSocket message", evt.data);
-    };
-    this.conn.onopen = function(evt) {
-      console.log("Connection opened");
-    };
+    if (this.initPromise) {
+      return this.initPromise;
+    }
+
+    if (this.conn && this.conn.readyState === 1) {
+      return Promise.resolve();
+    }
+
+    this.initPromise = new Promise((resolve, reject) => {
+      console.log('Live: connecting...');
+      this.conn = new WebSocket(this.getWebSocketUrl());
+
+      this.conn.onclose = function(evt) {
+        reject({message: 'Connection closed'});
+      };
+
+      this.conn.onmessage = function(evt) {
+        console.log("Live: message received:", evt.data);
+      };
+
+      this.conn.onopen = function(evt) {
+        console.log('Live: connection open');
+        resolve();
+      };
+    });
+
+    return this.initPromise;
+  }
+
+  send(data) {
+    this.conn.send(JSON.stringify(data));
   }
 
   subscribe(name) {
-    if (!this.conn) {
-      this.init();
-    }
+    return this.init().then(() =>  {
+      this.send({action: 'subscribe', stream: name});
+    });
   }
 
 }

+ 3 - 1
public/app/plugins/datasource/stream/datasource.ts → public/app/plugins/datasource/grafana-live/datasource.ts

@@ -15,7 +15,9 @@ export class GrafanaStreamDS {
     }
 
     var target = options.targets[0];
-    liveSrv.subscribe(target);
+    liveSrv.subscribe(target.stream);
+
+    return Promise.resolve({data: []});
   }
 }
 

+ 0 - 0
public/app/plugins/datasource/stream/module.ts → public/app/plugins/datasource/grafana-live/module.ts


+ 2 - 2
public/app/plugins/datasource/stream/partials/query.editor.html → public/app/plugins/datasource/grafana-live/partials/query.editor.html

@@ -1,8 +1,8 @@
 <query-editor-row ctrl="ctrl">
 	<li class="tight-form-item">
-		Stream Expression
+		Stream
 	</li>
 	<li>
-		<input type="text" class="tight-form-input input-large" ng-model="ctrl.target.channel">
+		<input type="text" class="tight-form-input input-large" ng-model="ctrl.target.stream">
 	</li>
 </query-editor-row>

+ 7 - 0
public/app/plugins/datasource/grafana-live/plugin.json

@@ -0,0 +1,7 @@
+{
+  "type": "datasource",
+  "name": "Grafana Live",
+  "id": "grafana-live",
+
+  "metrics": true
+}

+ 0 - 8
public/app/plugins/datasource/stream/plugin.json

@@ -1,8 +0,0 @@
-{
-  "type": "datasource",
-  "name": "Grafana Stream DS",
-  "id": "grafana-stream-ds",
-
-  "builtIn": true,
-  "metrics": true
-}