conn.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package live
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. "github.com/grafana/grafana/pkg/log"
  7. )
  8. const (
  9. // Time allowed to write a message to the peer.
  10. writeWait = 10 * time.Second
  11. // Time allowed to read the next pong message from the peer.
  12. pongWait = 60 * time.Second
  13. // Send pings to peer with this period. Must be less than pongWait.
  14. pingPeriod = (pongWait * 9) / 10
  15. // Maximum message size allowed from peer.
  16. maxMessageSize = 512
  17. )
  18. var upgrader = websocket.Upgrader{
  19. ReadBufferSize: 1024,
  20. WriteBufferSize: 1024,
  21. }
  22. type connection struct {
  23. ws *websocket.Conn
  24. send chan []byte
  25. }
  26. func (c *connection) readPump() {
  27. defer func() {
  28. h.unregister <- c
  29. c.ws.Close()
  30. }()
  31. c.ws.SetReadLimit(maxMessageSize)
  32. c.ws.SetReadDeadline(time.Now().Add(pongWait))
  33. c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  34. for {
  35. _, message, err := c.ws.ReadMessage()
  36. if err != nil {
  37. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
  38. log.Info("error: %v", err)
  39. }
  40. break
  41. }
  42. h.broadcast <- message
  43. }
  44. }
  45. func (c *connection) write(mt int, payload []byte) error {
  46. c.ws.SetWriteDeadline(time.Now().Add(writeWait))
  47. return c.ws.WriteMessage(mt, payload)
  48. }
  49. // writePump pumps messages from the hub to the websocket connection.
  50. func (c *connection) writePump() {
  51. ticker := time.NewTicker(pingPeriod)
  52. defer func() {
  53. ticker.Stop()
  54. c.ws.Close()
  55. }()
  56. for {
  57. select {
  58. case message, ok := <-c.send:
  59. if !ok {
  60. c.write(websocket.CloseMessage, []byte{})
  61. return
  62. }
  63. if err := c.write(websocket.TextMessage, message); err != nil {
  64. return
  65. }
  66. case <-ticker.C:
  67. if err := c.write(websocket.PingMessage, []byte{}); err != nil {
  68. return
  69. }
  70. }
  71. }
  72. }
  73. type LiveConn struct {
  74. }
  75. func New() *LiveConn {
  76. go h.run()
  77. return &LiveConn{}
  78. }
  79. func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
  80. log.Info("Live: Upgrading to WebSocket")
  81. ws, err := upgrader.Upgrade(w, r, nil)
  82. if err != nil {
  83. log.Error(3, "Live: Failed to upgrade connection to WebSocket", err)
  84. return
  85. }
  86. c := &connection{send: make(chan []byte, 256), ws: ws}
  87. h.register <- c
  88. go c.writePump()
  89. c.readPump()
  90. }