engine.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package alerting
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. "github.com/grafana/grafana/pkg/log"
  9. m "github.com/grafana/grafana/pkg/models"
  10. "github.com/grafana/grafana/pkg/services/alerting/alertstates"
  11. )
  12. type Engine struct {
  13. execQueue chan *AlertJob
  14. resultQueue chan *AlertResult
  15. clock clock.Clock
  16. ticker *Ticker
  17. scheduler Scheduler
  18. handler AlertingHandler
  19. ruleReader RuleReader
  20. log log.Logger
  21. notifier Notifier
  22. }
  23. func NewEngine() *Engine {
  24. e := &Engine{
  25. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  26. execQueue: make(chan *AlertJob, 1000),
  27. resultQueue: make(chan *AlertResult, 1000),
  28. scheduler: NewScheduler(),
  29. handler: NewHandler(),
  30. ruleReader: NewRuleReader(),
  31. log: log.New("alerting.engine"),
  32. notifier: NewNotifier(),
  33. }
  34. return e
  35. }
  36. func (e *Engine) Start() {
  37. e.log.Info("Starting Alerting Engine")
  38. go e.alertingTicker()
  39. go e.execDispatch()
  40. go e.resultHandler()
  41. }
  42. func (e *Engine) Stop() {
  43. close(e.execQueue)
  44. close(e.resultQueue)
  45. }
  46. func (e *Engine) alertingTicker() {
  47. defer func() {
  48. if err := recover(); err != nil {
  49. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  50. }
  51. }()
  52. tickIndex := 0
  53. for {
  54. select {
  55. case tick := <-e.ticker.C:
  56. // TEMP SOLUTION update rules ever tenth tick
  57. if tickIndex%10 == 0 {
  58. e.scheduler.Update(e.ruleReader.Fetch())
  59. }
  60. e.scheduler.Tick(tick, e.execQueue)
  61. tickIndex++
  62. }
  63. }
  64. }
  65. func (e *Engine) execDispatch() {
  66. defer func() {
  67. if err := recover(); err != nil {
  68. e.log.Error("Scheduler Panic: stopping executor", "error", err, "stack", log.Stack(1))
  69. }
  70. }()
  71. for job := range e.execQueue {
  72. log.Trace("Alerting: engine:execDispatch() starting job %s", job.Rule.Name)
  73. job.Running = true
  74. e.executeJob(job)
  75. }
  76. }
  77. func (e *Engine) executeJob(job *AlertJob) {
  78. now := time.Now()
  79. resultChan := make(chan *AlertResult, 1)
  80. go e.handler.Execute(job, resultChan)
  81. select {
  82. case <-time.After(time.Second * 5):
  83. e.resultQueue <- &AlertResult{
  84. State: alertstates.Pending,
  85. Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
  86. Error: fmt.Errorf("Timeout"),
  87. AlertJob: job,
  88. }
  89. e.log.Debug("Job Execution timeout", "alertRuleId", job.Rule.Id)
  90. case result := <-resultChan:
  91. result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  92. e.log.Debug("Job Execution done", "timeTakenMs", result.Duration, "ruleId", job.Rule.Id)
  93. e.resultQueue <- result
  94. }
  95. }
  96. func (e *Engine) resultHandler() {
  97. defer func() {
  98. if err := recover(); err != nil {
  99. e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1))
  100. }
  101. }()
  102. for result := range e.resultQueue {
  103. e.log.Debug("Alert Rule Result", "ruleId", result.AlertJob.Rule.Id, "state", result.State, "value", result.ActualValue, "retry", result.AlertJob.RetryCount)
  104. result.AlertJob.Running = false
  105. if result.Error != nil {
  106. result.AlertJob.IncRetry()
  107. if result.AlertJob.Retryable() {
  108. e.log.Error("Alert Rule Result Error", "ruleId", result.AlertJob.Rule.Id, "error", result.Error, "retry", result.AlertJob.RetryCount)
  109. e.execQueue <- result.AlertJob
  110. } else {
  111. e.log.Error("Alert Rule Result Error After Max Retries", "ruleId", result.AlertJob.Rule.Id, "error", result.Error, "retry", result.AlertJob.RetryCount)
  112. result.State = alertstates.Critical
  113. result.Description = fmt.Sprintf("Failed to run check after %d retires, Error: %v", maxAlertExecutionRetries, result.Error)
  114. e.reactToState(result)
  115. }
  116. } else {
  117. result.AlertJob.ResetRetry()
  118. e.reactToState(result)
  119. }
  120. }
  121. }
  122. func (e *Engine) reactToState(result *AlertResult) {
  123. if shouldUpdateState(result) {
  124. cmd := &m.UpdateAlertStateCommand{
  125. AlertId: result.AlertJob.Rule.Id,
  126. NewState: result.State,
  127. Info: result.Description,
  128. OrgId: result.AlertJob.Rule.OrgId,
  129. TriggeredAlerts: simplejson.NewFromAny(result.TriggeredAlerts),
  130. }
  131. if err := bus.Dispatch(cmd); err != nil {
  132. e.log.Error("Failed to save state", "error", err)
  133. }
  134. e.log.Debug("will notify about new state", "new state", result.State)
  135. e.notifier.Notify(result)
  136. }
  137. }
  138. func shouldUpdateState(result *AlertResult) bool {
  139. query := &m.GetLastAlertStateQuery{
  140. AlertId: result.AlertJob.Rule.Id,
  141. OrgId: result.AlertJob.Rule.OrgId,
  142. }
  143. if err := bus.Dispatch(query); err != nil {
  144. log.Error2("Failed to read last alert state", "error", err)
  145. return false
  146. }
  147. now := time.Now()
  148. noEarlierState := query.Result == nil
  149. olderThen15Min := query.Result.Created.Before(now.Add(time.Minute * -15))
  150. changedState := query.Result.NewState != result.State
  151. return noEarlierState || changedState || olderThen15Min
  152. }