alert_rule_reader.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package alerting
  2. import (
  3. "github.com/grafana/grafana/pkg/bus"
  4. m "github.com/grafana/grafana/pkg/models"
  5. "sync"
  6. "time"
  7. )
  8. type RuleReader interface {
  9. Fetch() []m.AlertJob
  10. }
  11. type AlertRuleReader struct {
  12. serverId string
  13. serverPosition int
  14. clusterSize int
  15. mtx sync.RWMutex
  16. }
  17. func NewRuleReader() *AlertRuleReader {
  18. rrr := &AlertRuleReader{}
  19. go rrr.initReader()
  20. return rrr
  21. }
  22. var (
  23. alertJobs []m.AlertJob
  24. )
  25. func (this *AlertRuleReader) initReader() {
  26. alertJobs = make([]m.AlertJob, 0)
  27. heartbeat := time.NewTicker(time.Second * 10)
  28. this.rr()
  29. for {
  30. select {
  31. case <-heartbeat.C:
  32. this.rr()
  33. }
  34. }
  35. }
  36. func (this *AlertRuleReader) rr() {
  37. this.mtx.Lock()
  38. defer this.mtx.Unlock()
  39. rules := make([]m.AlertRule, 0)
  40. /*
  41. rules = []m.AlertRule{
  42. //{Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
  43. //{Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
  44. //{Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
  45. //{Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
  46. //{Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
  47. {
  48. Id: 1,
  49. OrgId: 1,
  50. Title: "alert rule 1",
  51. Frequency: 3,
  52. DatasourceId: 1,
  53. WarnOperator: ">",
  54. WarnLevel: 3,
  55. CritOperator: ">",
  56. CritLevel: 4,
  57. Aggregator: "avg",
  58. //Query: `{"refId":"A","target":"statsd.fakesite.counters.session_start.*.count","textEditor":true}"`,
  59. Query: `{"hide":false,"refId":"A","target":"aliasByNode(statsd.fakesite.counters.session_start.*.count, 4)","textEditor":false}`,
  60. QueryRange: 3600,
  61. },
  62. }
  63. */
  64. cmd := &m.GetAlertsQuery{
  65. OrgId: 1,
  66. }
  67. bus.Dispatch(cmd)
  68. rules = cmd.Result
  69. //for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
  70. jobs := make([]m.AlertJob, 0)
  71. for _, rule := range rules {
  72. query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId}
  73. err := bus.Dispatch(query)
  74. if err != nil {
  75. continue
  76. }
  77. jobs = append(jobs, m.AlertJob{
  78. Rule: rule,
  79. Datasource: query.Result,
  80. })
  81. }
  82. alertJobs = jobs
  83. }
  84. func (this *AlertRuleReader) Fetch() []m.AlertJob {
  85. return alertJobs
  86. }
  87. func (this *AlertRuleReader) heartBeat() {
  88. //Lets cheat on this until we focus on clustering
  89. //log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
  90. this.clusterSize = 1
  91. this.serverPosition = 1
  92. /*
  93. cmd := &m.HeartBeatCommand{ServerId: this.serverId}
  94. err := bus.Dispatch(cmd)
  95. if err != nil {
  96. log.Error(1, "Failed to send heartbeat.")
  97. } else {
  98. this.clusterSize = cmd.Result.ClusterSize
  99. this.serverPosition = cmd.Result.UptimePosition
  100. }
  101. */
  102. }