hub.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package live
  2. import (
  3. "context"
  4. "github.com/grafana/grafana/pkg/api/dtos"
  5. "github.com/grafana/grafana/pkg/components/simplejson"
  6. "github.com/grafana/grafana/pkg/infra/log"
  7. )
  8. type hub struct {
  9. log log.Logger
  10. connections map[*connection]bool
  11. streams map[string]map[*connection]bool
  12. register chan *connection
  13. unregister chan *connection
  14. streamChannel chan *dtos.StreamMessage
  15. subChannel chan *streamSubscription
  16. }
  17. type streamSubscription struct {
  18. conn *connection
  19. name string
  20. remove bool
  21. }
  22. func newHub() *hub {
  23. return &hub{
  24. connections: make(map[*connection]bool),
  25. streams: make(map[string]map[*connection]bool),
  26. register: make(chan *connection),
  27. unregister: make(chan *connection),
  28. streamChannel: make(chan *dtos.StreamMessage),
  29. subChannel: make(chan *streamSubscription),
  30. log: log.New("stream.hub"),
  31. }
  32. }
  33. func (h *hub) run(ctx context.Context) {
  34. for {
  35. select {
  36. case <-ctx.Done():
  37. return
  38. case c := <-h.register:
  39. h.connections[c] = true
  40. h.log.Info("New connection", "total", len(h.connections))
  41. case c := <-h.unregister:
  42. if _, ok := h.connections[c]; ok {
  43. h.log.Info("Closing connection", "total", len(h.connections))
  44. delete(h.connections, c)
  45. close(c.send)
  46. }
  47. // hand stream subscriptions
  48. case sub := <-h.subChannel:
  49. h.log.Info("Subscribing", "channel", sub.name, "remove", sub.remove)
  50. subscribers, exists := h.streams[sub.name]
  51. // handle unsubscribe
  52. if exists && sub.remove {
  53. delete(subscribers, sub.conn)
  54. continue
  55. }
  56. if !exists {
  57. subscribers = make(map[*connection]bool)
  58. h.streams[sub.name] = subscribers
  59. }
  60. subscribers[sub.conn] = true
  61. // handle stream messages
  62. case message := <-h.streamChannel:
  63. subscribers, exists := h.streams[message.Stream]
  64. if !exists || len(subscribers) == 0 {
  65. h.log.Info("Message to stream without subscribers", "stream", message.Stream)
  66. continue
  67. }
  68. messageBytes, _ := simplejson.NewFromAny(message).Encode()
  69. for sub := range subscribers {
  70. // check if channel is open
  71. if _, ok := h.connections[sub]; !ok {
  72. delete(subscribers, sub)
  73. continue
  74. }
  75. select {
  76. case sub.send <- messageBytes:
  77. default:
  78. close(sub.send)
  79. delete(h.connections, sub)
  80. delete(subscribers, sub)
  81. }
  82. }
  83. }
  84. }
  85. }