alerting.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package alerting
  2. import (
  3. "math/rand"
  4. "strconv"
  5. "time"
  6. "github.com/grafana/grafana/pkg/log"
  7. m "github.com/grafana/grafana/pkg/models"
  8. "github.com/grafana/grafana/pkg/setting"
  9. )
  10. func Init() {
  11. if !setting.AlertingEnabled {
  12. return
  13. }
  14. log.Info("Alerting: Initializing scheduler...")
  15. scheduler := NewScheduler()
  16. go scheduler.Dispatch(&AlertRuleReader{})
  17. go scheduler.Executor(&DummieExecutor{})
  18. }
  19. type Scheduler struct {
  20. jobs []*AlertJob
  21. runQueue chan *AlertJob
  22. alertRuleFetcher RuleReader
  23. serverId string
  24. serverPosition int
  25. clusterSize int
  26. }
  27. func NewScheduler() *Scheduler {
  28. return &Scheduler{
  29. jobs: make([]*AlertJob, 0),
  30. runQueue: make(chan *AlertJob, 1000),
  31. serverId: strconv.Itoa(rand.Intn(1000)),
  32. }
  33. }
  34. func (s *Scheduler) heartBeat() {
  35. //write heartBeat to db.
  36. //get the modulus position of active servers
  37. log.Info("Heartbeat: Sending heartbeat from " + s.serverId)
  38. s.clusterSize = 1
  39. s.serverPosition = 1
  40. }
  41. func (s *Scheduler) Dispatch(reader RuleReader) {
  42. reschedule := time.NewTicker(time.Second * 10)
  43. secondTicker := time.NewTicker(time.Second)
  44. heartbeat := time.NewTicker(time.Second * 5)
  45. s.heartBeat()
  46. s.updateJobs(reader.Fetch)
  47. for {
  48. select {
  49. case <-secondTicker.C:
  50. s.queueJobs()
  51. case <-reschedule.C:
  52. s.updateJobs(reader.Fetch)
  53. case <-heartbeat.C:
  54. s.heartBeat()
  55. }
  56. }
  57. }
  58. func (s *Scheduler) updateJobs(f func() []m.AlertRule) {
  59. log.Debug("Scheduler: UpdateJobs()")
  60. jobs := make([]*AlertJob, 0)
  61. rules := f()
  62. for i := s.serverPosition - 1; i < len(rules); i += s.clusterSize {
  63. rule := rules[i]
  64. jobs = append(jobs, &AlertJob{
  65. id: rule.Id,
  66. name: rule.Title,
  67. frequency: rule.Frequency,
  68. rule: rule,
  69. offset: int64(len(jobs)),
  70. })
  71. }
  72. log.Debug("Scheduler: Selected %d jobs", len(jobs))
  73. s.jobs = jobs
  74. }
  75. func (s *Scheduler) queueJobs() {
  76. now := time.Now().Unix()
  77. for _, job := range s.jobs {
  78. if now%job.frequency == 0 {
  79. log.Info("Scheduler: Putting job on to run queue: %s", job.name)
  80. s.runQueue <- job
  81. }
  82. }
  83. }
  84. func (s *Scheduler) Executor(executor Executor) {
  85. for job := range s.runQueue {
  86. log.Info("Executor: queue length %d", len(s.runQueue))
  87. log.Info("Executor: executing %s", job.name)
  88. go executor.Execute(job.rule)
  89. }
  90. }
  91. type AlertJob struct {
  92. id int64
  93. name string
  94. frequency int64
  95. offset int64
  96. delay bool
  97. rule m.AlertRule
  98. }
  99. type AlertResult struct {
  100. id int64
  101. state string
  102. duration time.Time
  103. }