stream_manager.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package live
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/infra/log"
  8. m "github.com/grafana/grafana/pkg/models"
  9. )
  10. type StreamManager struct {
  11. log log.Logger
  12. streams map[string]*Stream
  13. streamRWMutex *sync.RWMutex
  14. hub *hub
  15. }
  16. func NewStreamManager() *StreamManager {
  17. return &StreamManager{
  18. hub: newHub(),
  19. log: log.New("stream.manager"),
  20. streams: make(map[string]*Stream),
  21. streamRWMutex: &sync.RWMutex{},
  22. }
  23. }
  24. func (sm *StreamManager) Run(context context.Context) {
  25. log.Info("Initializing Stream Manager")
  26. go func() {
  27. sm.hub.run(context)
  28. log.Info("Stopped Stream Manager")
  29. }()
  30. }
  31. func (sm *StreamManager) Serve(w http.ResponseWriter, r *http.Request) {
  32. sm.log.Info("Upgrading to WebSocket")
  33. ws, err := upgrader.Upgrade(w, r, nil)
  34. if err != nil {
  35. sm.log.Error("Failed to upgrade connection to WebSocket", "error", err)
  36. return
  37. }
  38. c := newConnection(ws, sm.hub)
  39. sm.hub.register <- c
  40. go c.writePump()
  41. c.readPump()
  42. }
  43. func (s *StreamManager) GetStreamList() m.StreamList {
  44. list := make(m.StreamList, 0)
  45. for _, stream := range s.streams {
  46. list = append(list, &m.StreamInfo{
  47. Name: stream.name,
  48. })
  49. }
  50. return list
  51. }
  52. func (s *StreamManager) Push(packet *m.StreamPacket) {
  53. stream, exist := s.streams[packet.Stream]
  54. if !exist {
  55. s.log.Info("Creating metric stream", "name", packet.Stream)
  56. stream = NewStream(packet.Stream)
  57. s.streams[stream.name] = stream
  58. }
  59. stream.Push(packet)
  60. }
  61. type Stream struct {
  62. subscribers []*connection
  63. name string
  64. }
  65. func NewStream(name string) *Stream {
  66. return &Stream{
  67. subscribers: make([]*connection, 0),
  68. name: name,
  69. }
  70. }
  71. func (s *Stream) Push(packet *m.StreamPacket) {
  72. messageBytes, _ := simplejson.NewFromAny(packet).Encode()
  73. for _, sub := range s.subscribers {
  74. // check if channel is open
  75. // if _, ok := h.connections[sub]; !ok {
  76. // delete(s.subscribers, sub)
  77. // continue
  78. // }
  79. sub.send <- messageBytes
  80. }
  81. }