engine.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package alerting
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/log"
  7. "github.com/grafana/grafana/pkg/services/alerting/alertstates"
  8. )
  9. type Engine struct {
  10. execQueue chan *AlertJob
  11. resultQueue chan *AlertResult
  12. clock clock.Clock
  13. ticker *Ticker
  14. scheduler Scheduler
  15. executor Executor
  16. ruleReader RuleReader
  17. }
  18. func NewEngine() *Engine {
  19. e := &Engine{
  20. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  21. execQueue: make(chan *AlertJob, 1000),
  22. resultQueue: make(chan *AlertResult, 1000),
  23. scheduler: NewScheduler(),
  24. executor: &ExecutorImpl{},
  25. ruleReader: NewRuleReader(),
  26. }
  27. return e
  28. }
  29. func (e *Engine) Start() {
  30. log.Info("Alerting: engine.Start()")
  31. go e.alertingTicker()
  32. go e.execDispatch()
  33. go e.resultHandler()
  34. }
  35. func (e *Engine) Stop() {
  36. close(e.execQueue)
  37. close(e.resultQueue)
  38. }
  39. func (e *Engine) alertingTicker() {
  40. tickIndex := 0
  41. for {
  42. select {
  43. case tick := <-e.ticker.C:
  44. // TEMP SOLUTION update rules ever tenth tick
  45. if tickIndex%10 == 0 {
  46. e.scheduler.Update(e.ruleReader.Fetch())
  47. }
  48. e.scheduler.Tick(tick, e.execQueue)
  49. tickIndex++
  50. }
  51. }
  52. }
  53. func (e *Engine) execDispatch() {
  54. for job := range e.execQueue {
  55. log.Trace("Alerting: engine:execDispatch() starting job %s", job.Rule.Name)
  56. job.Running = true
  57. e.executeJob(job)
  58. }
  59. }
  60. func (e *Engine) executeJob(job *AlertJob) {
  61. now := time.Now()
  62. resultChan := make(chan *AlertResult, 1)
  63. go e.executor.Execute(job, resultChan)
  64. select {
  65. case <-time.After(time.Second * 5):
  66. e.resultQueue <- &AlertResult{
  67. State: alertstates.Pending,
  68. Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
  69. Error: fmt.Errorf("Timeout"),
  70. AlertJob: job,
  71. }
  72. log.Trace("Alerting: engine.executeJob(): timeout")
  73. case result := <-resultChan:
  74. result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  75. log.Trace("Alerting: engine.executeJob(): done %vms", result.Duration)
  76. e.resultQueue <- result
  77. }
  78. }
  79. func (e *Engine) resultHandler() {
  80. for result := range e.resultQueue {
  81. log.Debug("Alerting: engine.resultHandler(): alert(%d) status(%s) actual(%v) retry(%d)", result.AlertJob.Rule.Id, result.State, result.ActualValue, result.AlertJob.RetryCount)
  82. result.AlertJob.Running = false
  83. // handle result error
  84. if result.Error != nil {
  85. result.AlertJob.RetryCount++
  86. if result.AlertJob.RetryCount < maxRetries {
  87. log.Error(3, "Alerting: Rule('%s') Result Error: %v, Retrying..", result.AlertJob.Rule.Name, result.Error)
  88. e.execQueue <- result.AlertJob
  89. } else {
  90. log.Error(3, "Alerting: Rule('%s') Result Error: %v, Max retries reached", result.AlertJob.Rule.Name, result.Error)
  91. result.State = alertstates.Critical
  92. result.Description = fmt.Sprintf("Failed to run check after %d retires, Error: %v", maxRetries, result.Error)
  93. saveState(result)
  94. }
  95. } else {
  96. result.AlertJob.RetryCount = 0
  97. saveState(result)
  98. }
  99. }
  100. }