hub.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package live
  2. import (
  3. "github.com/grafana/grafana/pkg/api/dtos"
  4. "github.com/grafana/grafana/pkg/log"
  5. )
  6. type hub struct {
  7. // Registered connections.
  8. connections map[*connection]bool
  9. // Inbound messages from the connections.
  10. broadcast chan []byte
  11. // Register requests from the connections.
  12. register chan *connection
  13. // Unregister requests from connections.
  14. unregister chan *connection
  15. streamPipe chan *dtos.StreamMessage
  16. }
  17. var h = hub{
  18. broadcast: make(chan []byte),
  19. register: make(chan *connection),
  20. unregister: make(chan *connection),
  21. connections: make(map[*connection]bool),
  22. }
  23. func (h *hub) run() {
  24. for {
  25. select {
  26. case c := <-h.register:
  27. h.connections[c] = true
  28. log.Info("Live: New connection (Total count: %v)", len(h.connections))
  29. case c := <-h.unregister:
  30. if _, ok := h.connections[c]; ok {
  31. delete(h.connections, c)
  32. close(c.send)
  33. }
  34. case m := <-h.broadcast:
  35. log.Info("Live: broadcasting")
  36. for c := range h.connections {
  37. select {
  38. case c.send <- m:
  39. default:
  40. close(c.send)
  41. delete(h.connections, c)
  42. }
  43. }
  44. }
  45. }
  46. }
  47. func SendMessage(message string) {
  48. h.broadcast <- []byte(message)
  49. }