alerting.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package alerting
  2. import (
  3. "math/rand"
  4. "strconv"
  5. "time"
  6. //"github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/bus"
  8. "github.com/grafana/grafana/pkg/log"
  9. m "github.com/grafana/grafana/pkg/models"
  10. "github.com/grafana/grafana/pkg/setting"
  11. "sync"
  12. )
  13. func Init() {
  14. if !setting.AlertingEnabled {
  15. return
  16. }
  17. log.Info("Alerting: Initializing scheduler...")
  18. scheduler := NewScheduler()
  19. go scheduler.Dispatch(&AlertRuleReader{})
  20. go scheduler.Executor(&ExecutorImpl{})
  21. go scheduler.HandleResponses()
  22. }
  23. type Scheduler struct {
  24. jobs map[int64]*AlertJob
  25. runQueue chan *AlertJob
  26. responseQueue chan *AlertResult
  27. mtx sync.RWMutex
  28. alertRuleFetcher RuleReader
  29. serverId string
  30. serverPosition int
  31. clusterSize int
  32. }
  33. func NewScheduler() *Scheduler {
  34. return &Scheduler{
  35. jobs: make(map[int64]*AlertJob, 0),
  36. runQueue: make(chan *AlertJob, 1000),
  37. responseQueue: make(chan *AlertResult, 1000),
  38. serverId: strconv.Itoa(rand.Intn(1000)),
  39. }
  40. }
  41. func (this *Scheduler) heartBeat() {
  42. //Lets cheat on this until we focus on clustering
  43. //log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
  44. this.clusterSize = 1
  45. this.serverPosition = 1
  46. /*
  47. cmd := &m.HeartBeatCommand{ServerId: this.serverId}
  48. err := bus.Dispatch(cmd)
  49. if err != nil {
  50. log.Error(1, "Failed to send heartbeat.")
  51. } else {
  52. this.clusterSize = cmd.Result.ClusterSize
  53. this.serverPosition = cmd.Result.UptimePosition
  54. }
  55. */
  56. }
  57. func (this *Scheduler) Dispatch(reader RuleReader) {
  58. reschedule := time.NewTicker(time.Second * 100)
  59. secondTicker := time.NewTicker(time.Second)
  60. heartbeat := time.NewTicker(time.Second * 5)
  61. this.heartBeat()
  62. this.updateJobs(reader.Fetch)
  63. for {
  64. select {
  65. case <-secondTicker.C:
  66. this.queueJobs()
  67. case <-reschedule.C:
  68. this.updateJobs(reader.Fetch)
  69. case <-heartbeat.C:
  70. this.heartBeat()
  71. }
  72. }
  73. }
  74. func (this *Scheduler) updateJobs(f func() []m.AlertRule) {
  75. log.Debug("Scheduler: UpdateJobs()")
  76. jobs := make(map[int64]*AlertJob, 0)
  77. rules := f()
  78. this.mtx.Lock()
  79. defer this.mtx.Unlock()
  80. for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
  81. rule := rules[i]
  82. jobs[rule.Id] = &AlertJob{rule: rule, offset: int64(len(jobs))}
  83. }
  84. log.Debug("Scheduler: Selected %d jobs", len(jobs))
  85. this.jobs = jobs
  86. }
  87. func (this *Scheduler) queueJobs() {
  88. now := time.Now().Unix()
  89. for _, job := range this.jobs {
  90. if now%job.rule.Frequency == 0 && job.running == false {
  91. log.Info("Scheduler: Putting job on to run queue: %s", job.rule.Title)
  92. this.runQueue <- job
  93. }
  94. }
  95. }
  96. func (this *Scheduler) Executor(executor Executor) {
  97. for job := range this.runQueue {
  98. //log.Info("Executor: queue length %d", len(this.runQueue))
  99. log.Info("Executor: executing %s", job.rule.Title)
  100. this.jobs[job.rule.Id].running = true
  101. this.MeasureAndExecute(executor, job)
  102. }
  103. }
  104. func (this *Scheduler) HandleResponses() {
  105. for response := range this.responseQueue {
  106. log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue)
  107. if this.jobs[response.Id] != nil {
  108. this.jobs[response.Id].running = false
  109. }
  110. cmd := m.UpdateAlertStateCommand{
  111. AlertId: response.Id,
  112. NewState: response.State,
  113. }
  114. if err := bus.Dispatch(&cmd); err != nil {
  115. log.Error(1, "failed to save state", err)
  116. }
  117. }
  118. }
  119. func (this *Scheduler) MeasureAndExecute(exec Executor, rule *AlertJob) {
  120. now := time.Now()
  121. response := make(chan *AlertResult, 1)
  122. go exec.Execute(rule.rule, response)
  123. select {
  124. case <-time.After(time.Second * 5):
  125. this.responseQueue <- &AlertResult{Id: rule.rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
  126. case r := <-response:
  127. r.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  128. log.Info("Schedular: exeuction took %vms", r.Duration)
  129. this.responseQueue <- r
  130. }
  131. }
  132. type AlertJob struct {
  133. offset int64
  134. delay bool
  135. running bool
  136. rule m.AlertRule
  137. }
  138. type AlertResult struct {
  139. Id int64
  140. State string
  141. ActualValue float64
  142. Duration float64
  143. }