engine.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package alerting
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/log"
  8. m "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/services/alerting/alertstates"
  10. )
  11. type Engine struct {
  12. execQueue chan *AlertJob
  13. resultQueue chan *AlertResult
  14. clock clock.Clock
  15. ticker *Ticker
  16. scheduler Scheduler
  17. handler AlertingHandler
  18. ruleReader RuleReader
  19. log log.Logger
  20. notifier Notifier
  21. }
  22. func NewEngine() *Engine {
  23. e := &Engine{
  24. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  25. execQueue: make(chan *AlertJob, 1000),
  26. resultQueue: make(chan *AlertResult, 1000),
  27. scheduler: NewScheduler(),
  28. handler: NewHandler(),
  29. ruleReader: NewRuleReader(),
  30. log: log.New("alerting.engine"),
  31. notifier: NewNotifier(),
  32. }
  33. return e
  34. }
  35. func (e *Engine) Start() {
  36. e.log.Info("Starting Alerting Engine")
  37. go e.alertingTicker()
  38. go e.execDispatch()
  39. go e.resultHandler()
  40. }
  41. func (e *Engine) Stop() {
  42. close(e.execQueue)
  43. close(e.resultQueue)
  44. }
  45. func (e *Engine) alertingTicker() {
  46. defer func() {
  47. if err := recover(); err != nil {
  48. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  49. }
  50. }()
  51. tickIndex := 0
  52. for {
  53. select {
  54. case tick := <-e.ticker.C:
  55. // TEMP SOLUTION update rules ever tenth tick
  56. if tickIndex%10 == 0 {
  57. e.scheduler.Update(e.ruleReader.Fetch())
  58. }
  59. e.scheduler.Tick(tick, e.execQueue)
  60. tickIndex++
  61. }
  62. }
  63. }
  64. func (e *Engine) execDispatch() {
  65. defer func() {
  66. if err := recover(); err != nil {
  67. e.log.Error("Scheduler Panic: stopping executor", "error", err, "stack", log.Stack(1))
  68. }
  69. }()
  70. for job := range e.execQueue {
  71. log.Trace("Alerting: engine:execDispatch() starting job %s", job.Rule.Name)
  72. job.Running = true
  73. e.executeJob(job)
  74. }
  75. }
  76. func (e *Engine) executeJob(job *AlertJob) {
  77. now := time.Now()
  78. resultChan := make(chan *AlertResult, 1)
  79. go e.handler.Execute(job, resultChan)
  80. select {
  81. case <-time.After(time.Second * 5):
  82. e.resultQueue <- &AlertResult{
  83. State: alertstates.Pending,
  84. Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
  85. Error: fmt.Errorf("Timeout"),
  86. AlertJob: job,
  87. }
  88. e.log.Debug("Job Execution timeout", "alertRuleId", job.Rule.Id)
  89. case result := <-resultChan:
  90. result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  91. e.log.Debug("Job Execution done", "timeTakenMs", result.Duration, "ruleId", job.Rule.Id)
  92. e.resultQueue <- result
  93. }
  94. }
  95. func (e *Engine) resultHandler() {
  96. defer func() {
  97. if err := recover(); err != nil {
  98. e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1))
  99. }
  100. }()
  101. for result := range e.resultQueue {
  102. e.log.Debug("Alert Rule Result", "ruleId", result.AlertJob.Rule.Id, "state", result.State, "value", result.ActualValue, "retry", result.AlertJob.RetryCount)
  103. result.AlertJob.Running = false
  104. if result.Error != nil {
  105. result.AlertJob.IncRetry()
  106. if result.AlertJob.Retryable() {
  107. e.log.Error("Alert Rule Result Error", "ruleId", result.AlertJob.Rule.Id, "error", result.Error, "retry", result.AlertJob.RetryCount)
  108. e.execQueue <- result.AlertJob
  109. } else {
  110. e.log.Error("Alert Rule Result Error After Max Retries", "ruleId", result.AlertJob.Rule.Id, "error", result.Error, "retry", result.AlertJob.RetryCount)
  111. result.State = alertstates.Critical
  112. result.Description = fmt.Sprintf("Failed to run check after %d retires, Error: %v", maxAlertExecutionRetries, result.Error)
  113. e.reactToState(result)
  114. }
  115. } else {
  116. result.AlertJob.ResetRetry()
  117. e.reactToState(result)
  118. }
  119. }
  120. }
  121. func (e *Engine) reactToState(result *AlertResult) {
  122. if shouldUpdateState(result) {
  123. cmd := &m.UpdateAlertStateCommand{
  124. AlertId: result.AlertJob.Rule.Id,
  125. NewState: result.State,
  126. Info: result.Description,
  127. OrgId: result.AlertJob.Rule.OrgId,
  128. }
  129. if err := bus.Dispatch(cmd); err != nil {
  130. e.log.Error("Failed to save state", "error", err)
  131. }
  132. e.log.Debug("will notify about new state", "new state", result.State)
  133. e.notifier.Notify(result)
  134. }
  135. }
  136. func shouldUpdateState(result *AlertResult) bool {
  137. query := &m.GetLastAlertStateQuery{
  138. AlertId: result.AlertJob.Rule.Id,
  139. OrgId: result.AlertJob.Rule.OrgId,
  140. }
  141. if err := bus.Dispatch(query); err != nil {
  142. log.Error2("Failed to read last alert state", "error", err)
  143. return false
  144. }
  145. now := time.Now()
  146. noEarlierState := query.Result == nil
  147. olderThen15Min := query.Result.Created.Before(now.Add(time.Minute * -15))
  148. changedState := query.Result.NewState != result.State
  149. return noEarlierState || changedState || olderThen15Min
  150. }