engine.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package alerting
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/log"
  7. "github.com/grafana/grafana/pkg/services/alerting/alertstates"
  8. )
  9. type Engine struct {
  10. execQueue chan *AlertJob
  11. resultQueue chan *AlertResult
  12. clock clock.Clock
  13. ticker *Ticker
  14. scheduler Scheduler
  15. executor Executor
  16. ruleReader RuleReader
  17. }
  18. func NewEngine() *Engine {
  19. e := &Engine{
  20. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  21. execQueue: make(chan *AlertJob, 1000),
  22. resultQueue: make(chan *AlertResult, 1000),
  23. scheduler: NewScheduler(),
  24. executor: &ExecutorImpl{},
  25. ruleReader: NewRuleReader(),
  26. }
  27. return e
  28. }
  29. func (e *Engine) Start() {
  30. log.Info("Alerting: Engine.Start()")
  31. go e.schedulerTick()
  32. go e.execDispatch()
  33. go e.resultHandler()
  34. }
  35. func (e *Engine) Stop() {
  36. close(e.execQueue)
  37. close(e.resultQueue)
  38. }
  39. func (e *Engine) schedulerTick() {
  40. tickIndex := 0
  41. for {
  42. select {
  43. case tick := <-e.ticker.C:
  44. // update rules ever tenth tick
  45. if tickIndex%10 == 0 {
  46. e.scheduler.Update(e.ruleReader.Fetch())
  47. }
  48. e.scheduler.Tick(tick, e.execQueue)
  49. }
  50. }
  51. }
  52. func (e *Engine) execDispatch() {
  53. for job := range e.execQueue {
  54. log.Trace("Alerting: Engine:execDispatch() starting job %s", job.Rule.Title)
  55. job.Running = true
  56. e.executeJob(job)
  57. }
  58. }
  59. func (e *Engine) executeJob(job *AlertJob) {
  60. now := time.Now()
  61. resultChan := make(chan *AlertResult, 1)
  62. go e.executor.Execute(job, resultChan)
  63. select {
  64. case <-time.After(time.Second * 5):
  65. e.resultQueue <- &AlertResult{
  66. Id: job.Rule.Id,
  67. State: alertstates.Pending,
  68. Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
  69. AlertJob: job,
  70. }
  71. case result := <-resultChan:
  72. result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  73. log.Trace("Alerting: engine.executeJob(): exeuction took %vms", result.Duration)
  74. e.resultQueue <- result
  75. }
  76. }
  77. func (e *Engine) resultHandler() {
  78. for result := range e.resultQueue {
  79. log.Debug("Alerting: engine.resultHandler(): alert(%d) status(%s) actual(%v) retry(%d)", result.Id, result.State, result.ActualValue, result.AlertJob.RetryCount)
  80. result.AlertJob.Running = false
  81. if result.IsResultIncomplete() {
  82. result.AlertJob.RetryCount++
  83. if result.AlertJob.RetryCount < maxRetries {
  84. e.execQueue <- result.AlertJob
  85. } else {
  86. saveState(&AlertResult{
  87. Id: result.Id,
  88. State: alertstates.Critical,
  89. Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
  90. })
  91. }
  92. } else {
  93. result.AlertJob.RetryCount = 0
  94. saveState(result)
  95. }
  96. }
  97. }