engine.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package alerting
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/log"
  7. "github.com/grafana/grafana/pkg/services/alerting/alertstates"
  8. )
  9. type Engine struct {
  10. execQueue chan *AlertJob
  11. resultQueue chan *AlertResult
  12. clock clock.Clock
  13. ticker *Ticker
  14. scheduler Scheduler
  15. handler AlertingHandler
  16. ruleReader RuleReader
  17. log log.Logger
  18. responseHandler ResultHandler
  19. }
  20. func NewEngine() *Engine {
  21. e := &Engine{
  22. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  23. execQueue: make(chan *AlertJob, 1000),
  24. resultQueue: make(chan *AlertResult, 1000),
  25. scheduler: NewScheduler(),
  26. handler: NewHandler(),
  27. ruleReader: NewRuleReader(),
  28. log: log.New("alerting.engine"),
  29. responseHandler: NewResultHandler(),
  30. }
  31. return e
  32. }
  33. func (e *Engine) Start() {
  34. e.log.Info("Starting Alerting Engine")
  35. go e.alertingTicker()
  36. go e.execDispatch()
  37. go e.resultHandler()
  38. }
  39. func (e *Engine) Stop() {
  40. close(e.execQueue)
  41. close(e.resultQueue)
  42. }
  43. func (e *Engine) alertingTicker() {
  44. defer func() {
  45. if err := recover(); err != nil {
  46. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  47. }
  48. }()
  49. tickIndex := 0
  50. for {
  51. select {
  52. case tick := <-e.ticker.C:
  53. // TEMP SOLUTION update rules ever tenth tick
  54. if tickIndex%10 == 0 {
  55. e.scheduler.Update(e.ruleReader.Fetch())
  56. }
  57. e.scheduler.Tick(tick, e.execQueue)
  58. tickIndex++
  59. }
  60. }
  61. }
  62. func (e *Engine) execDispatch() {
  63. defer func() {
  64. if err := recover(); err != nil {
  65. e.log.Error("Scheduler Panic: stopping executor", "error", err, "stack", log.Stack(1))
  66. }
  67. }()
  68. for job := range e.execQueue {
  69. log.Trace("Alerting: engine:execDispatch() starting job %s", job.Rule.Name)
  70. job.Running = true
  71. e.executeJob(job)
  72. }
  73. }
  74. func (e *Engine) executeJob(job *AlertJob) {
  75. now := time.Now()
  76. resultChan := make(chan *AlertResult, 1)
  77. go e.handler.Execute(job, resultChan)
  78. select {
  79. case <-time.After(time.Second * 5):
  80. e.resultQueue <- &AlertResult{
  81. State: alertstates.Pending,
  82. Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
  83. Error: fmt.Errorf("Timeout"),
  84. AlertJob: job,
  85. ExeuctionTime: time.Now(),
  86. }
  87. e.log.Debug("Job Execution timeout", "alertRuleId", job.Rule.Id)
  88. case result := <-resultChan:
  89. result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  90. e.log.Debug("Job Execution done", "timeTakenMs", result.Duration, "ruleId", job.Rule.Id)
  91. e.resultQueue <- result
  92. }
  93. }
  94. func (e *Engine) resultHandler() {
  95. defer func() {
  96. if err := recover(); err != nil {
  97. e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1))
  98. }
  99. }()
  100. for result := range e.resultQueue {
  101. e.log.Debug("Alert Rule Result", "ruleId", result.AlertJob.Rule.Id, "state", result.State, "value", result.ActualValue, "retry", result.AlertJob.RetryCount)
  102. result.AlertJob.Running = false
  103. if result.Error != nil {
  104. result.AlertJob.IncRetry()
  105. if result.AlertJob.Retryable() {
  106. e.log.Error("Alert Rule Result Error", "ruleId", result.AlertJob.Rule.Id, "error", result.Error, "retry", result.AlertJob.RetryCount)
  107. e.execQueue <- result.AlertJob
  108. } else {
  109. e.log.Error("Alert Rule Result Error After Max Retries", "ruleId", result.AlertJob.Rule.Id, "error", result.Error, "retry", result.AlertJob.RetryCount)
  110. result.State = alertstates.Critical
  111. result.Description = fmt.Sprintf("Failed to run check after %d retires, Error: %v", maxAlertExecutionRetries, result.Error)
  112. e.responseHandler.Handle(result)
  113. }
  114. } else {
  115. result.AlertJob.ResetRetry()
  116. e.responseHandler.Handle(result)
  117. }
  118. }
  119. }