rule_reader.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package alerting
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/grafana/grafana/pkg/bus"
  6. "github.com/grafana/grafana/pkg/log"
  7. m "github.com/grafana/grafana/pkg/models"
  8. )
  9. type RuleReader interface {
  10. Fetch() []*AlertRule
  11. }
  12. type AlertRuleReader struct {
  13. sync.RWMutex
  14. serverID string
  15. serverPosition int
  16. clusterSize int
  17. }
  18. func NewRuleReader() *AlertRuleReader {
  19. ruleReader := &AlertRuleReader{}
  20. go ruleReader.initReader()
  21. return ruleReader
  22. }
  23. func (arr *AlertRuleReader) initReader() {
  24. heartbeat := time.NewTicker(time.Second * 10)
  25. for {
  26. select {
  27. case <-heartbeat.C:
  28. arr.heartbeat()
  29. }
  30. }
  31. }
  32. func (arr *AlertRuleReader) Fetch() []*AlertRule {
  33. cmd := &m.GetAllAlertsQuery{}
  34. err := bus.Dispatch(cmd)
  35. if err != nil {
  36. log.Error(1, "Alerting: ruleReader.fetch(): Could not load alerts", err)
  37. return []*AlertRule{}
  38. }
  39. res := make([]*AlertRule, len(cmd.Result))
  40. for i, ruleDef := range cmd.Result {
  41. model, _ := ConvetAlertModelToAlertRule(ruleDef)
  42. res[i] = model
  43. }
  44. return res
  45. }
  46. func (arr *AlertRuleReader) heartbeat() {
  47. //Lets cheat on this until we focus on clustering
  48. //log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
  49. arr.clusterSize = 1
  50. arr.serverPosition = 1
  51. /*
  52. cmd := &m.HeartBeatCommand{ServerId: this.serverId}
  53. err := bus.Dispatch(cmd)
  54. if err != nil {
  55. log.Error(1, "Failed to send heartbeat.")
  56. } else {
  57. this.clusterSize = cmd.Result.ClusterSize
  58. this.serverPosition = cmd.Result.UptimePosition
  59. }
  60. */
  61. }