瀏覽代碼

poc(websockets): websocket poc

Torkel Ödegaard 9 年之前
父節點
當前提交
fc877ae0f4
共有 5 個文件被更改,包括 176 次插入2 次删除
  1. 6 1
      pkg/api/api.go
  2. 3 0
      pkg/api/search.go
  3. 105 0
      pkg/live/conn.go
  4. 52 0
      pkg/live/hub.go
  5. 10 1
      public/app/features/admin/admin.ts

+ 6 - 1
pkg/api/api.go

@@ -4,6 +4,7 @@ import (
 	"github.com/go-macaron/binding"
 	"github.com/grafana/grafana/pkg/api/avatar"
 	"github.com/grafana/grafana/pkg/api/dtos"
+	"github.com/grafana/grafana/pkg/live"
 	"github.com/grafana/grafana/pkg/middleware"
 	m "github.com/grafana/grafana/pkg/models"
 	"gopkg.in/macaron.v1"
@@ -35,6 +36,7 @@ func Register(r *macaron.Macaron) {
 	r.Get("/org/users/", reqSignedIn, Index)
 	r.Get("/org/apikeys/", reqSignedIn, Index)
 	r.Get("/dashboard/import/", reqSignedIn, Index)
+	r.Get("/admin", reqGrafanaAdmin, Index)
 	r.Get("/admin/settings", reqGrafanaAdmin, Index)
 	r.Get("/admin/users", reqGrafanaAdmin, Index)
 	r.Get("/admin/users/create", reqGrafanaAdmin, Index)
@@ -230,7 +232,10 @@ func Register(r *macaron.Macaron) {
 	avt := avatar.CacheServer()
 	r.Get("/avatar/:hash", avt.ServeHTTP)
 
+	// Websocket
+	liveConn := live.New()
+	r.Any("/ws", liveConn.Serve)
+
 	InitAppPluginRoutes(r)
 
-	r.NotFound(NotFoundHandler)
 }

+ 3 - 0
pkg/api/search.go

@@ -2,6 +2,7 @@ package api
 
 import (
 	"github.com/grafana/grafana/pkg/bus"
+	"github.com/grafana/grafana/pkg/live"
 	"github.com/grafana/grafana/pkg/middleware"
 	"github.com/grafana/grafana/pkg/services/search"
 )
@@ -32,4 +33,6 @@ func Search(c *middleware.Context) {
 	}
 
 	c.JSON(200, searchQuery.Result)
+
+	live.SendMessage(query)
 }

+ 105 - 0
pkg/live/conn.go

@@ -0,0 +1,105 @@
+package live
+
+import (
+	"net/http"
+	"time"
+
+	"github.com/gorilla/websocket"
+	"github.com/grafana/grafana/pkg/log"
+)
+
+const (
+	// Time allowed to write a message to the peer.
+	writeWait = 10 * time.Second
+
+	// Time allowed to read the next pong message from the peer.
+	pongWait = 60 * time.Second
+
+	// Send pings to peer with this period. Must be less than pongWait.
+	pingPeriod = (pongWait * 9) / 10
+
+	// Maximum message size allowed from peer.
+	maxMessageSize = 512
+)
+
+var upgrader = websocket.Upgrader{
+	ReadBufferSize:  1024,
+	WriteBufferSize: 1024,
+}
+
+type connection struct {
+	ws   *websocket.Conn
+	send chan []byte
+}
+
+func (c *connection) readPump() {
+	defer func() {
+		h.unregister <- c
+		c.ws.Close()
+	}()
+	c.ws.SetReadLimit(maxMessageSize)
+	c.ws.SetReadDeadline(time.Now().Add(pongWait))
+	c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
+	for {
+		_, message, err := c.ws.ReadMessage()
+		if err != nil {
+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
+				log.Info("error: %v", err)
+			}
+			break
+		}
+		h.broadcast <- message
+	}
+}
+
+func (c *connection) write(mt int, payload []byte) error {
+	c.ws.SetWriteDeadline(time.Now().Add(writeWait))
+	return c.ws.WriteMessage(mt, payload)
+}
+
+// writePump pumps messages from the hub to the websocket connection.
+func (c *connection) writePump() {
+	ticker := time.NewTicker(pingPeriod)
+	defer func() {
+		ticker.Stop()
+		c.ws.Close()
+	}()
+	for {
+		select {
+		case message, ok := <-c.send:
+			if !ok {
+				c.write(websocket.CloseMessage, []byte{})
+				return
+			}
+			if err := c.write(websocket.TextMessage, message); err != nil {
+				return
+			}
+		case <-ticker.C:
+			if err := c.write(websocket.PingMessage, []byte{}); err != nil {
+				return
+			}
+		}
+	}
+}
+
+type LiveConn struct {
+}
+
+func New() *LiveConn {
+	go h.run()
+	return &LiveConn{}
+}
+
+func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
+	log.Info("Live: Upgrading to WebSocket")
+
+	ws, err := upgrader.Upgrade(w, r, nil)
+	if err != nil {
+		log.Error(3, "Live: Failed to upgrade connection to WebSocket", err)
+		return
+	}
+	c := &connection{send: make(chan []byte, 256), ws: ws}
+	h.register <- c
+	go c.writePump()
+	c.readPump()
+}

+ 52 - 0
pkg/live/hub.go

@@ -0,0 +1,52 @@
+package live
+
+import "github.com/grafana/grafana/pkg/log"
+
+type hub struct {
+	// Registered connections.
+	connections map[*connection]bool
+
+	// Inbound messages from the connections.
+	broadcast chan []byte
+
+	// Register requests from the connections.
+	register chan *connection
+
+	// Unregister requests from connections.
+	unregister chan *connection
+}
+
+var h = hub{
+	broadcast:   make(chan []byte),
+	register:    make(chan *connection),
+	unregister:  make(chan *connection),
+	connections: make(map[*connection]bool),
+}
+
+func (h *hub) run() {
+	for {
+		select {
+		case c := <-h.register:
+			h.connections[c] = true
+		case c := <-h.unregister:
+			if _, ok := h.connections[c]; ok {
+				delete(h.connections, c)
+				close(c.send)
+			}
+		case m := <-h.broadcast:
+			log.Info("Live: broadcasting")
+			for c := range h.connections {
+				select {
+				case c.send <- m:
+				default:
+					close(c.send)
+					delete(h.connections, c)
+				}
+			}
+		}
+	}
+}
+
+func SendMessage(message string) {
+	h.broadcast <- []byte(message)
+}

+ 10 - 1
public/app/features/admin/admin.ts

@@ -19,7 +19,16 @@ class AdminSettingsCtrl {
 
 class AdminHomeCtrl {
   /** @ngInject **/
-  constructor() {}
+  constructor() {
+
+    var conn = new WebSocket("ws://localhost:3000/ws");
+    conn.onclose = function(evt) {
+      console.log("Connection closed");
+    };
+    conn.onmessage = function(evt) {
+      console.log("message", evt.data);
+    };
+  }
 }
 
 export class AdminStatsCtrl {