hub.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package live
  2. import (
  3. "context"
  4. "github.com/grafana/grafana/pkg/api/dtos"
  5. "github.com/grafana/grafana/pkg/components/simplejson"
  6. "github.com/grafana/grafana/pkg/log"
  7. )
  8. type hub struct {
  9. log log.Logger
  10. connections map[*connection]bool
  11. streams map[string]map[*connection]bool
  12. register chan *connection
  13. unregister chan *connection
  14. streamChannel chan *dtos.StreamMessage
  15. subChannel chan *streamSubscription
  16. }
  17. type streamSubscription struct {
  18. conn *connection
  19. name string
  20. remove bool
  21. }
  22. func newHub() *hub {
  23. return &hub{
  24. connections: make(map[*connection]bool),
  25. streams: make(map[string]map[*connection]bool),
  26. register: make(chan *connection),
  27. unregister: make(chan *connection),
  28. streamChannel: make(chan *dtos.StreamMessage),
  29. subChannel: make(chan *streamSubscription),
  30. log: log.New("stream.hub"),
  31. }
  32. }
  33. func (h *hub) removeConnection() {
  34. }
  35. func (h *hub) run(ctx context.Context) {
  36. for {
  37. select {
  38. case <-ctx.Done():
  39. return
  40. case c := <-h.register:
  41. h.connections[c] = true
  42. h.log.Info("New connection", "total", len(h.connections))
  43. case c := <-h.unregister:
  44. if _, ok := h.connections[c]; ok {
  45. h.log.Info("Closing connection", "total", len(h.connections))
  46. delete(h.connections, c)
  47. close(c.send)
  48. }
  49. // hand stream subscriptions
  50. case sub := <-h.subChannel:
  51. h.log.Info("Subscribing", "channel", sub.name, "remove", sub.remove)
  52. subscribers, exists := h.streams[sub.name]
  53. // handle unsubscribe
  54. if exists && sub.remove {
  55. delete(subscribers, sub.conn)
  56. continue
  57. }
  58. if !exists {
  59. subscribers = make(map[*connection]bool)
  60. h.streams[sub.name] = subscribers
  61. }
  62. subscribers[sub.conn] = true
  63. // handle stream messages
  64. case message := <-h.streamChannel:
  65. subscribers, exists := h.streams[message.Stream]
  66. if !exists || len(subscribers) == 0 {
  67. h.log.Info("Message to stream without subscribers", "stream", message.Stream)
  68. continue
  69. }
  70. messageBytes, _ := simplejson.NewFromAny(message).Encode()
  71. for sub := range subscribers {
  72. // check if channel is open
  73. if _, ok := h.connections[sub]; !ok {
  74. delete(subscribers, sub)
  75. continue
  76. }
  77. select {
  78. case sub.send <- messageBytes:
  79. default:
  80. close(sub.send)
  81. delete(h.connections, sub)
  82. delete(subscribers, sub)
  83. }
  84. }
  85. }
  86. }
  87. }