conn.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package live
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/log"
  8. )
  9. const (
  10. // Time allowed to write a message to the peer.
  11. writeWait = 10 * time.Second
  12. // Time allowed to read the next pong message from the peer.
  13. pongWait = 60 * time.Second
  14. // Send pings to peer with this period. Must be less than pongWait.
  15. pingPeriod = (pongWait * 9) / 10
  16. // Maximum message size allowed from peer.
  17. maxMessageSize = 512
  18. )
  19. var upgrader = websocket.Upgrader{
  20. ReadBufferSize: 1024,
  21. WriteBufferSize: 1024,
  22. CheckOrigin: func(r *http.Request) bool {
  23. return true
  24. },
  25. }
  26. type subscription struct {
  27. name string
  28. }
  29. type connection struct {
  30. ws *websocket.Conn
  31. streams []*subscription
  32. send chan []byte
  33. }
  34. func newConnection(ws *websocket.Conn) *connection {
  35. return &connection{
  36. send: make(chan []byte, 256),
  37. streams: make([]*subscription, 0),
  38. ws: ws,
  39. }
  40. }
  41. func (c *connection) readPump() {
  42. defer func() {
  43. h.unregister <- c
  44. c.ws.Close()
  45. }()
  46. c.ws.SetReadLimit(maxMessageSize)
  47. c.ws.SetReadDeadline(time.Now().Add(pongWait))
  48. c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  49. for {
  50. _, message, err := c.ws.ReadMessage()
  51. if err != nil {
  52. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
  53. log.Info("error: %v", err)
  54. }
  55. break
  56. }
  57. c.handleMessage(message)
  58. }
  59. }
  60. func (c *connection) handleMessage(message []byte) {
  61. json, err := simplejson.NewJson(message)
  62. if err != nil {
  63. log.Error(3, "Unreadable message on websocket channel:", err)
  64. }
  65. msgType := json.Get("action").MustString()
  66. streamName := json.Get("stream").MustString()
  67. switch msgType {
  68. case "subscribe":
  69. c.streams = append(c.streams, &subscription{name: streamName})
  70. log.Info("Live: subscribing to stream %v", streamName)
  71. }
  72. }
  73. func (c *connection) write(mt int, payload []byte) error {
  74. c.ws.SetWriteDeadline(time.Now().Add(writeWait))
  75. return c.ws.WriteMessage(mt, payload)
  76. }
  77. // writePump pumps messages from the hub to the websocket connection.
  78. func (c *connection) writePump() {
  79. ticker := time.NewTicker(pingPeriod)
  80. defer func() {
  81. ticker.Stop()
  82. c.ws.Close()
  83. }()
  84. for {
  85. select {
  86. case message, ok := <-c.send:
  87. if !ok {
  88. c.write(websocket.CloseMessage, []byte{})
  89. return
  90. }
  91. if err := c.write(websocket.TextMessage, message); err != nil {
  92. return
  93. }
  94. case <-ticker.C:
  95. if err := c.write(websocket.PingMessage, []byte{}); err != nil {
  96. return
  97. }
  98. }
  99. }
  100. }