alerting.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package alerting
  2. import (
  3. "math/rand"
  4. "strconv"
  5. "time"
  6. "github.com/grafana/grafana/pkg/log"
  7. m "github.com/grafana/grafana/pkg/models"
  8. "github.com/grafana/grafana/pkg/setting"
  9. )
  10. func Init() {
  11. if !setting.AlertingEnabled {
  12. return
  13. }
  14. log.Info("Alerting: Initializing scheduler...")
  15. scheduler := NewScheduler()
  16. go scheduler.Dispatch(&AlertRuleReader{})
  17. go scheduler.Executor(&DummieExecutor{})
  18. }
  19. type Scheduler struct {
  20. jobs []*AlertJob
  21. runQueue chan *AlertJob
  22. alertRuleFetcher RuleReader
  23. serverId string
  24. serverPosition int
  25. clusterSize int
  26. }
  27. func NewScheduler() *Scheduler {
  28. return &Scheduler{
  29. jobs: make([]*AlertJob, 0),
  30. runQueue: make(chan *AlertJob, 1000),
  31. serverId: strconv.Itoa(rand.Intn(1000)),
  32. }
  33. }
  34. func (s *Scheduler) heartBeat() {
  35. //write heartBeat to db.
  36. //get the modulus position of active servers
  37. log.Info("Heartbeat: Sending heartbeat from " + s.serverId)
  38. s.clusterSize = 1
  39. s.serverPosition = 1
  40. }
  41. func (s *Scheduler) Dispatch(reader RuleReader) {
  42. reschedule := time.NewTicker(time.Second * 10)
  43. secondTicker := time.NewTicker(time.Second)
  44. ticker := time.NewTicker(time.Second * 5)
  45. s.heartBeat()
  46. s.updateJobs(reader)
  47. for {
  48. select {
  49. case <-secondTicker.C:
  50. s.queueJobs()
  51. case <-reschedule.C:
  52. s.updateJobs(reader)
  53. case <-ticker.C:
  54. s.heartBeat()
  55. }
  56. }
  57. }
  58. func (s *Scheduler) updateJobs(reader RuleReader) {
  59. log.Debug("Scheduler: UpdateJobs()")
  60. jobs := make([]*AlertJob, 0)
  61. rules := reader.Fetch()
  62. for i := s.serverPosition - 1; i < len(rules); i = i + s.clusterSize {
  63. rule := rules[i]
  64. jobs = append(jobs, &AlertJob{
  65. name: rule.Title,
  66. frequency: rule.Frequency,
  67. rule: rule,
  68. offset: int64(len(jobs)),
  69. })
  70. }
  71. log.Debug("Scheduler: Selected %d jobs", len(jobs))
  72. s.jobs = jobs
  73. }
  74. func (s *Scheduler) queueJobs() {
  75. now := time.Now().Unix()
  76. for _, job := range s.jobs {
  77. if now%job.frequency == 0 {
  78. log.Info("Scheduler: Putting job on to run queue: %s", job.name)
  79. s.runQueue <- job
  80. }
  81. }
  82. }
  83. func (s *Scheduler) Executor(executor Executor) {
  84. for job := range s.runQueue {
  85. log.Info("Executor: queue length %d", len(s.runQueue))
  86. log.Info("Executor: executing %s", job.name)
  87. go executor.Execute(job.rule)
  88. }
  89. }
  90. type AlertJob struct {
  91. id int64
  92. name string
  93. frequency int64
  94. offset int64
  95. delay bool
  96. rule m.AlertRule
  97. }
  98. type AlertResult struct {
  99. id int64
  100. state string
  101. duration time.Time
  102. }
  103. type RuleReader interface {
  104. Fetch() []m.AlertRule
  105. }
  106. type AlertRuleReader struct{}
  107. func (this AlertRuleReader) Fetch() []m.AlertRule {
  108. return []m.AlertRule{
  109. {Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
  110. {Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
  111. {Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
  112. {Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
  113. {Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
  114. {Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1},
  115. }
  116. }
  117. type Executor interface {
  118. Execute(rule m.AlertRule) (err error, result AlertResult)
  119. }
  120. type DummieExecutor struct{}
  121. func (this DummieExecutor) Execute(rule m.AlertRule) (err error, result AlertResult) {
  122. time.Sleep(1000)
  123. return nil, AlertResult{state: "OK", id: rule.Id}
  124. }