alerting.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package alerting
  2. import (
  3. "math/rand"
  4. "strconv"
  5. "time"
  6. "github.com/grafana/grafana/pkg/log"
  7. m "github.com/grafana/grafana/pkg/models"
  8. "github.com/grafana/grafana/pkg/setting"
  9. )
  10. func Init() {
  11. if !setting.AlertingEnabled {
  12. return
  13. }
  14. log.Info("Alerting: Initializing scheduler...")
  15. scheduler := NewScheduler()
  16. go scheduler.Dispatch(&AlertRuleReader{})
  17. go scheduler.Executor(&DummieExecutor{})
  18. }
  19. type Scheduler struct {
  20. jobs []*AlertJob
  21. runQueue chan *AlertJob
  22. alertRuleFetcher RuleReader
  23. serverId string
  24. serverPosition int
  25. clusterSize int
  26. }
  27. func NewScheduler() *Scheduler {
  28. return &Scheduler{
  29. jobs: make([]*AlertJob, 0),
  30. runQueue: make(chan *AlertJob, 1000),
  31. serverId: strconv.Itoa(rand.Intn(1000)),
  32. }
  33. }
  34. func (s *Scheduler) heartBeat() {
  35. //write heartBeat to db.
  36. //get the modulus position of active servers
  37. log.Info("Heartbeat: Sending heartbeat from " + s.serverId)
  38. s.clusterSize = 1
  39. s.serverPosition = 1
  40. }
  41. func (s *Scheduler) Dispatch(reader RuleReader) {
  42. reschedule := time.NewTicker(time.Second * 10)
  43. secondTicker := time.NewTicker(time.Second)
  44. ticker := time.NewTicker(time.Second * 5)
  45. s.heartBeat()
  46. s.updateJobs(reader)
  47. for {
  48. select {
  49. case <-secondTicker.C:
  50. s.queueJobs()
  51. case <-reschedule.C:
  52. s.updateJobs(reader)
  53. case <-ticker.C:
  54. s.heartBeat()
  55. }
  56. }
  57. }
  58. func (s *Scheduler) updateJobs(reader RuleReader) {
  59. log.Debug("Scheduler: UpdateJobs()")
  60. jobs := make([]*AlertJob, 0)
  61. rules := reader.Fetch()
  62. for i := s.serverPosition - 1; i < len(rules); i += s.clusterSize {
  63. rule := rules[i]
  64. jobs = append(jobs, &AlertJob{
  65. id: rule.Id,
  66. name: rule.Title,
  67. frequency: rule.Frequency,
  68. rule: rule,
  69. offset: int64(len(jobs)),
  70. })
  71. }
  72. log.Debug("Scheduler: Selected %d jobs", len(jobs))
  73. s.jobs = jobs
  74. }
  75. func (s *Scheduler) queueJobs() {
  76. now := time.Now().Unix()
  77. for _, job := range s.jobs {
  78. if now%job.frequency == 0 {
  79. log.Info("Scheduler: Putting job on to run queue: %s", job.name)
  80. s.runQueue <- job
  81. }
  82. }
  83. }
  84. func (s *Scheduler) Executor(executor Executor) {
  85. for job := range s.runQueue {
  86. log.Info("Executor: queue length %d", len(s.runQueue))
  87. log.Info("Executor: executing %s", job.name)
  88. go executor.Execute(job.rule)
  89. }
  90. }
  91. type AlertJob struct {
  92. id int64
  93. name string
  94. frequency int64
  95. offset int64
  96. delay bool
  97. rule m.AlertRule
  98. }
  99. type AlertResult struct {
  100. id int64
  101. state string
  102. duration time.Time
  103. }
  104. type RuleReader interface {
  105. Fetch() []m.AlertRule
  106. }
  107. type AlertRuleReader struct{}
  108. func (this AlertRuleReader) Fetch() []m.AlertRule {
  109. return []m.AlertRule{
  110. {Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
  111. {Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
  112. {Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
  113. {Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
  114. {Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
  115. {Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1},
  116. }
  117. }
  118. type Executor interface {
  119. Execute(rule m.AlertRule) (err error, result AlertResult)
  120. }
  121. type DummieExecutor struct{}
  122. func (this DummieExecutor) Execute(rule m.AlertRule) (err error, result AlertResult) {
  123. if rule.Id == 6 {
  124. time.Sleep(time.Second * 60)
  125. }
  126. time.Sleep(time.Second)
  127. log.Info("Finnished executing: %d", rule.Id)
  128. return nil, AlertResult{state: "OK", id: rule.Id}
  129. }