| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package live
- import (
- "context"
- "net/http"
- "sync"
- "github.com/grafana/grafana/pkg/components/simplejson"
- "github.com/grafana/grafana/pkg/infra/log"
- m "github.com/grafana/grafana/pkg/models"
- )
- type StreamManager struct {
- log log.Logger
- streams map[string]*Stream
- streamRWMutex *sync.RWMutex
- hub *hub
- }
- func NewStreamManager() *StreamManager {
- return &StreamManager{
- hub: newHub(),
- log: log.New("stream.manager"),
- streams: make(map[string]*Stream),
- streamRWMutex: &sync.RWMutex{},
- }
- }
- func (sm *StreamManager) Run(context context.Context) {
- log.Info("Initializing Stream Manager")
- go func() {
- sm.hub.run(context)
- log.Info("Stopped Stream Manager")
- }()
- }
- func (sm *StreamManager) Serve(w http.ResponseWriter, r *http.Request) {
- sm.log.Info("Upgrading to WebSocket")
- ws, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- sm.log.Error("Failed to upgrade connection to WebSocket", "error", err)
- return
- }
- c := newConnection(ws, sm.hub)
- sm.hub.register <- c
- go c.writePump()
- c.readPump()
- }
- func (s *StreamManager) GetStreamList() m.StreamList {
- list := make(m.StreamList, 0)
- for _, stream := range s.streams {
- list = append(list, &m.StreamInfo{
- Name: stream.name,
- })
- }
- return list
- }
- func (s *StreamManager) Push(packet *m.StreamPacket) {
- stream, exist := s.streams[packet.Stream]
- if !exist {
- s.log.Info("Creating metric stream", "name", packet.Stream)
- stream = NewStream(packet.Stream)
- s.streams[stream.name] = stream
- }
- stream.Push(packet)
- }
- type Stream struct {
- subscribers []*connection
- name string
- }
- func NewStream(name string) *Stream {
- return &Stream{
- subscribers: make([]*connection, 0),
- name: name,
- }
- }
- func (s *Stream) Push(packet *m.StreamPacket) {
- messageBytes, _ := simplejson.NewFromAny(packet).Encode()
- for _, sub := range s.subscribers {
- // check if channel is open
- // if _, ok := h.connections[sub]; !ok {
- // delete(s.subscribers, sub)
- // continue
- // }
- sub.send <- messageBytes
- }
- }
|