conn.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package live
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/infra/log"
  8. )
  9. const (
  10. // Time allowed to write a message to the peer.
  11. writeWait = 10 * time.Second
  12. // Time allowed to read the next pong message from the peer.
  13. pongWait = 60 * time.Second
  14. // Send pings to peer with this period. Must be less than pongWait.
  15. pingPeriod = (pongWait * 9) / 10
  16. // Maximum message size allowed from peer.
  17. maxMessageSize = 512
  18. )
  19. var upgrader = websocket.Upgrader{
  20. ReadBufferSize: 1024,
  21. WriteBufferSize: 1024,
  22. CheckOrigin: func(r *http.Request) bool {
  23. return true
  24. },
  25. }
  26. type connection struct {
  27. hub *hub
  28. ws *websocket.Conn
  29. send chan []byte
  30. }
  31. func newConnection(ws *websocket.Conn, hub *hub) *connection {
  32. return &connection{
  33. hub: hub,
  34. send: make(chan []byte, 256),
  35. ws: ws,
  36. }
  37. }
  38. func (c *connection) readPump() {
  39. defer func() {
  40. c.hub.unregister <- c
  41. c.ws.Close()
  42. }()
  43. c.ws.SetReadLimit(maxMessageSize)
  44. c.ws.SetReadDeadline(time.Now().Add(pongWait))
  45. c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  46. for {
  47. _, message, err := c.ws.ReadMessage()
  48. if err != nil {
  49. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
  50. log.Info("error: %v", err)
  51. }
  52. break
  53. }
  54. c.handleMessage(message)
  55. }
  56. }
  57. func (c *connection) handleMessage(message []byte) {
  58. json, err := simplejson.NewJson(message)
  59. if err != nil {
  60. log.Error(3, "Unreadable message on websocket channel. error: %v", err)
  61. }
  62. msgType := json.Get("action").MustString()
  63. streamName := json.Get("stream").MustString()
  64. if len(streamName) == 0 {
  65. log.Error(3, "Not allowed to subscribe to empty stream name")
  66. return
  67. }
  68. switch msgType {
  69. case "subscribe":
  70. c.hub.subChannel <- &streamSubscription{name: streamName, conn: c}
  71. case "unsubscribe":
  72. c.hub.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true}
  73. }
  74. }
  75. func (c *connection) write(mt int, payload []byte) error {
  76. c.ws.SetWriteDeadline(time.Now().Add(writeWait))
  77. return c.ws.WriteMessage(mt, payload)
  78. }
  79. // writePump pumps messages from the hub to the websocket connection.
  80. func (c *connection) writePump() {
  81. ticker := time.NewTicker(pingPeriod)
  82. defer func() {
  83. ticker.Stop()
  84. c.ws.Close()
  85. }()
  86. for {
  87. select {
  88. case message, ok := <-c.send:
  89. if !ok {
  90. c.write(websocket.CloseMessage, []byte{})
  91. return
  92. }
  93. if err := c.write(websocket.TextMessage, message); err != nil {
  94. return
  95. }
  96. case <-ticker.C:
  97. if err := c.write(websocket.PingMessage, []byte{}); err != nil {
  98. return
  99. }
  100. }
  101. }
  102. }