Browse Source

feat(live): wip work

Torkel Ödegaard 10 years ago
parent
commit
195be2742c
5 changed files with 53 additions and 23 deletions
  1. 3 0
      pkg/api/api.go
  2. 9 0
      pkg/api/dtos/stream.go
  3. 0 22
      pkg/api/live/conn.go
  4. 6 1
      pkg/api/live/hub.go
  5. 35 0
      pkg/api/live/live.go

+ 3 - 0
pkg/api/api.go

@@ -244,6 +244,9 @@ func Register(r *macaron.Macaron) {
 	liveConn := live.New()
 	r.Any("/ws", liveConn.Serve)
 
+	// streams
+	r.Post("/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream)
+
 	InitAppPluginRoutes(r)
 
 }

+ 9 - 0
pkg/api/dtos/stream.go

@@ -0,0 +1,9 @@
+package dtos
+
+import "encoding/json"
+
+type StreamMessage struct {
+	Stream     string          `json:"stream"`
+	Metric     string          `json:"name"`
+	Datapoints [][]json.Number `json:"username"`
+}

+ 0 - 22
pkg/api/live/conn.go

@@ -115,25 +115,3 @@ func (c *connection) writePump() {
 		}
 	}
 }
-
-type LiveConn struct {
-}
-
-func New() *LiveConn {
-	go h.run()
-	return &LiveConn{}
-}
-
-func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
-	log.Info("Live: Upgrading to WebSocket")
-
-	ws, err := upgrader.Upgrade(w, r, nil)
-	if err != nil {
-		log.Error(3, "Live: Failed to upgrade connection to WebSocket", err)
-		return
-	}
-	c := newConnection(ws)
-	h.register <- c
-	go c.writePump()
-	c.readPump()
-}

+ 6 - 1
pkg/api/live/hub.go

@@ -1,6 +1,9 @@
 package live
 
-import "github.com/grafana/grafana/pkg/log"
+import (
+	"github.com/grafana/grafana/pkg/api/dtos"
+	"github.com/grafana/grafana/pkg/log"
+)
 
 type hub struct {
 	// Registered connections.
@@ -14,6 +17,8 @@ type hub struct {
 
 	// Unregister requests from connections.
 	unregister chan *connection
+
+	streamPipe chan *dtos.StreamMessage
 }
 
 var h = hub{

+ 35 - 0
pkg/api/live/live.go

@@ -0,0 +1,35 @@
+package live
+
+import (
+	"net/http"
+
+	"github.com/grafana/grafana/pkg/api/dtos"
+	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/middleware"
+)
+
+type LiveConn struct {
+}
+
+func New() *LiveConn {
+	go h.run()
+	return &LiveConn{}
+}
+
+func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
+	log.Info("Live: Upgrading to WebSocket")
+
+	ws, err := upgrader.Upgrade(w, r, nil)
+	if err != nil {
+		log.Error(3, "Live: Failed to upgrade connection to WebSocket", err)
+		return
+	}
+	c := newConnection(ws)
+	h.register <- c
+	go c.writePump()
+	c.readPump()
+}
+
+func (lc *LiveConn) PushToStream(c *middleware.Context, message dtos.StreamMessage) {
+
+}