alerting.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package alerting
  2. import (
  3. "math/rand"
  4. "strconv"
  5. "time"
  6. //"github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/log"
  8. m "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/setting"
  10. "sync"
  11. )
  12. func Init() {
  13. if !setting.AlertingEnabled {
  14. return
  15. }
  16. log.Info("Alerting: Initializing scheduler...")
  17. scheduler := NewScheduler()
  18. go scheduler.Dispatch(&AlertRuleReader{})
  19. go scheduler.Executor(&ExecutorImpl{})
  20. go scheduler.HandleResponses()
  21. }
  22. type Scheduler struct {
  23. jobs map[int64]*AlertJob
  24. runQueue chan *AlertJob
  25. responseQueue chan *AlertResult
  26. mtx sync.RWMutex
  27. alertRuleFetcher RuleReader
  28. serverId string
  29. serverPosition int
  30. clusterSize int
  31. }
  32. func NewScheduler() *Scheduler {
  33. return &Scheduler{
  34. jobs: make(map[int64]*AlertJob, 0),
  35. runQueue: make(chan *AlertJob, 1000),
  36. responseQueue: make(chan *AlertResult, 1000),
  37. serverId: strconv.Itoa(rand.Intn(1000)),
  38. }
  39. }
  40. func (this *Scheduler) heartBeat() {
  41. //Lets cheat on this until we focus on clustering
  42. //log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
  43. this.clusterSize = 1
  44. this.serverPosition = 1
  45. /*
  46. cmd := &m.HeartBeatCommand{ServerId: this.serverId}
  47. err := bus.Dispatch(cmd)
  48. if err != nil {
  49. log.Error(1, "Failed to send heartbeat.")
  50. } else {
  51. this.clusterSize = cmd.Result.ClusterSize
  52. this.serverPosition = cmd.Result.UptimePosition
  53. }
  54. */
  55. }
  56. func (this *Scheduler) Dispatch(reader RuleReader) {
  57. reschedule := time.NewTicker(time.Second * 100)
  58. secondTicker := time.NewTicker(time.Second)
  59. heartbeat := time.NewTicker(time.Second * 5)
  60. this.heartBeat()
  61. this.updateJobs(reader.Fetch)
  62. for {
  63. select {
  64. case <-secondTicker.C:
  65. this.queueJobs()
  66. case <-reschedule.C:
  67. this.updateJobs(reader.Fetch)
  68. case <-heartbeat.C:
  69. this.heartBeat()
  70. }
  71. }
  72. }
  73. func (this *Scheduler) updateJobs(f func() []m.AlertRule) {
  74. log.Debug("Scheduler: UpdateJobs()")
  75. jobs := make(map[int64]*AlertJob, 0)
  76. rules := f()
  77. this.mtx.Lock()
  78. defer this.mtx.Unlock()
  79. for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
  80. rule := rules[i]
  81. jobs[rule.Id] = &AlertJob{rule: rule, offset: int64(len(jobs))}
  82. }
  83. log.Debug("Scheduler: Selected %d jobs", len(jobs))
  84. this.jobs = jobs
  85. }
  86. func (this *Scheduler) queueJobs() {
  87. now := time.Now().Unix()
  88. for _, job := range this.jobs {
  89. if now%job.rule.Frequency == 0 && job.running == false {
  90. log.Info("Scheduler: Putting job on to run queue: %s", job.rule.Title)
  91. this.runQueue <- job
  92. }
  93. }
  94. }
  95. func (this *Scheduler) Executor(executor Executor) {
  96. for job := range this.runQueue {
  97. //log.Info("Executor: queue length %d", len(this.runQueue))
  98. log.Info("Executor: executing %s", job.rule.Title)
  99. this.jobs[job.rule.Id].running = true
  100. this.MeasureAndExecute(executor, job)
  101. }
  102. }
  103. func (this *Scheduler) HandleResponses() {
  104. for response := range this.responseQueue {
  105. log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue)
  106. if this.jobs[response.Id] != nil {
  107. this.jobs[response.Id].running = false
  108. }
  109. }
  110. }
  111. func (this *Scheduler) MeasureAndExecute(exec Executor, rule *AlertJob) {
  112. now := time.Now()
  113. response := make(chan *AlertResult, 1)
  114. go exec.Execute(rule.rule, response)
  115. select {
  116. case <-time.After(time.Second * 5):
  117. this.responseQueue <- &AlertResult{Id: rule.rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
  118. case r := <-response:
  119. r.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  120. log.Info("Schedular: exeuction took %vms", r.Duration)
  121. this.responseQueue <- r
  122. }
  123. }
  124. type AlertJob struct {
  125. offset int64
  126. delay bool
  127. running bool
  128. rule m.AlertRule
  129. }
  130. type AlertResult struct {
  131. Id int64
  132. State string
  133. ActualValue float64
  134. Duration float64
  135. }