فهرست منبع

feat(websocket): more work websocket ds, # 4355

Torkel Ödegaard 10 سال پیش
والد
کامیت
3d5251d9a5
5فایلهای تغییر یافته به همراه59 افزوده شده و 40 حذف شده
  1. 1 1
      pkg/api/api.go
  2. 2 2
      pkg/api/dtos/stream.go
  3. 10 12
      pkg/api/live/conn.go
  4. 44 24
      pkg/api/live/hub.go
  5. 2 1
      pkg/api/live/live.go

+ 1 - 1
pkg/api/api.go

@@ -245,7 +245,7 @@ func Register(r *macaron.Macaron) {
 	r.Any("/ws", liveConn.Serve)
 
 	// streams
-	r.Post("/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream)
+	r.Post("/api/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream)
 
 	InitAppPluginRoutes(r)
 

+ 2 - 2
pkg/api/dtos/stream.go

@@ -4,6 +4,6 @@ import "encoding/json"
 
 type StreamMessage struct {
 	Stream     string          `json:"stream"`
-	Metric     string          `json:"name"`
-	Datapoints [][]json.Number `json:"username"`
+	Metric     string          `json:"metric"`
+	Datapoints [][]json.Number `json:"Datapoints"`
 }

+ 10 - 12
pkg/api/live/conn.go

@@ -31,21 +31,15 @@ var upgrader = websocket.Upgrader{
 	},
 }
 
-type subscription struct {
-	name string
-}
-
 type connection struct {
-	ws      *websocket.Conn
-	streams []*subscription
-	send    chan []byte
+	ws   *websocket.Conn
+	send chan []byte
 }
 
 func newConnection(ws *websocket.Conn) *connection {
 	return &connection{
-		send:    make(chan []byte, 256),
-		streams: make([]*subscription, 0),
-		ws:      ws,
+		send: make(chan []byte, 256),
+		ws:   ws,
 	}
 }
 
@@ -79,10 +73,14 @@ func (c *connection) handleMessage(message []byte) {
 	msgType := json.Get("action").MustString()
 	streamName := json.Get("stream").MustString()
 
+	if len(streamName) == 0 {
+		log.Error(3, "Not allowed to subscribe to empty stream name")
+		return
+	}
+
 	switch msgType {
 	case "subscribe":
-		c.streams = append(c.streams, &subscription{name: streamName})
-		log.Info("Live: subscribing to stream %v", streamName)
+		h.subChannel <- &streamSubscription{name: streamName, conn: c}
 	}
 }
 

+ 44 - 24
pkg/api/live/hub.go

@@ -2,30 +2,36 @@ package live
 
 import (
 	"github.com/grafana/grafana/pkg/api/dtos"
+	"github.com/grafana/grafana/pkg/components/simplejson"
 	"github.com/grafana/grafana/pkg/log"
 )
 
 type hub struct {
-	// Registered connections.
 	connections map[*connection]bool
+	streams     map[string]map[*connection]bool
 
-	// Inbound messages from the connections.
-	broadcast chan []byte
-
-	// Register requests from the connections.
-	register chan *connection
-
-	// Unregister requests from connections.
-	unregister chan *connection
+	register      chan *connection
+	unregister    chan *connection
+	streamChannel chan *dtos.StreamMessage
+	subChannel    chan *streamSubscription
+}
 
-	streamPipe chan *dtos.StreamMessage
+type streamSubscription struct {
+	conn *connection
+	name string
 }
 
 var h = hub{
-	broadcast:   make(chan []byte),
-	register:    make(chan *connection),
-	unregister:  make(chan *connection),
-	connections: make(map[*connection]bool),
+	connections:   make(map[*connection]bool),
+	streams:       make(map[string]map[*connection]bool),
+	register:      make(chan *connection),
+	unregister:    make(chan *connection),
+	streamChannel: make(chan *dtos.StreamMessage),
+	subChannel:    make(chan *streamSubscription),
+}
+
+func (h *hub) removeConnection() {
+
 }
 
 func (h *hub) run() {
@@ -39,20 +45,34 @@ func (h *hub) run() {
 				delete(h.connections, c)
 				close(c.send)
 			}
-		case m := <-h.broadcast:
-			log.Info("Live: broadcasting")
-			for c := range h.connections {
+		// hand stream subscriptions
+		case sub := <-h.subChannel:
+			log.Info("Live: Connection subscribing to: %v", sub.name)
+			subscribers, exists := h.streams[sub.name]
+			if !exists {
+				subscribers = make(map[*connection]bool)
+				h.streams[sub.name] = subscribers
+			}
+			subscribers[sub.conn] = true
+
+			// handle stream messages
+		case message := <-h.streamChannel:
+			subscribers, exists := h.streams[message.Stream]
+			if !exists {
+				log.Info("Live: Message to stream without subscribers: %v", message.Stream)
+				continue
+			}
+
+			messageBytes, _ := simplejson.NewFromAny(message).Encode()
+			for sub := range subscribers {
 				select {
-				case c.send <- m:
+				case sub.send <- messageBytes:
 				default:
-					close(c.send)
-					delete(h.connections, c)
+					close(sub.send)
+					delete(h.connections, sub)
+					delete(subscribers, sub)
 				}
 			}
 		}
 	}
 }
-
-func SendMessage(message string) {
-	h.broadcast <- []byte(message)
-}

+ 2 - 1
pkg/api/live/live.go

@@ -31,5 +31,6 @@ func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
 }
 
 func (lc *LiveConn) PushToStream(c *middleware.Context, message dtos.StreamMessage) {
-
+	h.streamChannel <- &message
+	c.JsonOK("Message recevived")
 }