conn.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package live
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/log"
  8. )
  9. const (
  10. // Time allowed to write a message to the peer.
  11. writeWait = 10 * time.Second
  12. // Time allowed to read the next pong message from the peer.
  13. pongWait = 60 * time.Second
  14. // Send pings to peer with this period. Must be less than pongWait.
  15. pingPeriod = (pongWait * 9) / 10
  16. // Maximum message size allowed from peer.
  17. maxMessageSize = 512
  18. )
  19. var upgrader = websocket.Upgrader{
  20. ReadBufferSize: 1024,
  21. WriteBufferSize: 1024,
  22. CheckOrigin: func(r *http.Request) bool {
  23. return true
  24. },
  25. }
  26. type connection struct {
  27. ws *websocket.Conn
  28. send chan []byte
  29. }
  30. func newConnection(ws *websocket.Conn) *connection {
  31. return &connection{
  32. send: make(chan []byte, 256),
  33. ws: ws,
  34. }
  35. }
  36. func (c *connection) readPump() {
  37. defer func() {
  38. h.unregister <- c
  39. c.ws.Close()
  40. }()
  41. c.ws.SetReadLimit(maxMessageSize)
  42. c.ws.SetReadDeadline(time.Now().Add(pongWait))
  43. c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  44. for {
  45. _, message, err := c.ws.ReadMessage()
  46. if err != nil {
  47. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
  48. log.Info("error: %v", err)
  49. }
  50. break
  51. }
  52. c.handleMessage(message)
  53. }
  54. }
  55. func (c *connection) handleMessage(message []byte) {
  56. json, err := simplejson.NewJson(message)
  57. if err != nil {
  58. log.Error(3, "Unreadable message on websocket channel:", err)
  59. }
  60. msgType := json.Get("action").MustString()
  61. streamName := json.Get("stream").MustString()
  62. if len(streamName) == 0 {
  63. log.Error(3, "Not allowed to subscribe to empty stream name")
  64. return
  65. }
  66. switch msgType {
  67. case "subscribe":
  68. h.subChannel <- &streamSubscription{name: streamName, conn: c}
  69. case "unsubscribe":
  70. h.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true}
  71. }
  72. }
  73. func (c *connection) write(mt int, payload []byte) error {
  74. c.ws.SetWriteDeadline(time.Now().Add(writeWait))
  75. return c.ws.WriteMessage(mt, payload)
  76. }
  77. // writePump pumps messages from the hub to the websocket connection.
  78. func (c *connection) writePump() {
  79. ticker := time.NewTicker(pingPeriod)
  80. defer func() {
  81. ticker.Stop()
  82. c.ws.Close()
  83. }()
  84. for {
  85. select {
  86. case message, ok := <-c.send:
  87. if !ok {
  88. c.write(websocket.CloseMessage, []byte{})
  89. return
  90. }
  91. if err := c.write(websocket.TextMessage, message); err != nil {
  92. return
  93. }
  94. case <-ticker.C:
  95. if err := c.write(websocket.PingMessage, []byte{}); err != nil {
  96. return
  97. }
  98. }
  99. }
  100. }