engine.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package alerting
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/log"
  7. )
  8. type Engine struct {
  9. execQueue chan *AlertJob
  10. resultQueue chan *AlertResultContext
  11. clock clock.Clock
  12. ticker *Ticker
  13. scheduler Scheduler
  14. handler AlertHandler
  15. ruleReader RuleReader
  16. log log.Logger
  17. responseHandler ResultHandler
  18. alertJobTimeout time.Duration
  19. }
  20. func NewEngine() *Engine {
  21. e := &Engine{
  22. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  23. execQueue: make(chan *AlertJob, 1000),
  24. resultQueue: make(chan *AlertResultContext, 1000),
  25. scheduler: NewScheduler(),
  26. handler: NewHandler(),
  27. ruleReader: NewRuleReader(),
  28. log: log.New("alerting.engine"),
  29. responseHandler: NewResultHandler(),
  30. alertJobTimeout: time.Second * 5,
  31. }
  32. return e
  33. }
  34. func (e *Engine) Start() {
  35. e.log.Info("Starting Alerting Engine")
  36. go e.alertingTicker()
  37. go e.execDispatch()
  38. go e.resultHandler()
  39. }
  40. func (e *Engine) Stop() {
  41. close(e.execQueue)
  42. close(e.resultQueue)
  43. }
  44. func (e *Engine) alertingTicker() {
  45. defer func() {
  46. if err := recover(); err != nil {
  47. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  48. }
  49. }()
  50. tickIndex := 0
  51. for {
  52. select {
  53. case tick := <-e.ticker.C:
  54. // TEMP SOLUTION update rules ever tenth tick
  55. if tickIndex%10 == 0 {
  56. e.scheduler.Update(e.ruleReader.Fetch())
  57. }
  58. e.scheduler.Tick(tick, e.execQueue)
  59. tickIndex++
  60. }
  61. }
  62. }
  63. func (e *Engine) execDispatch() {
  64. defer func() {
  65. if err := recover(); err != nil {
  66. e.log.Error("Scheduler Panic: stopping executor", "error", err, "stack", log.Stack(1))
  67. }
  68. }()
  69. for job := range e.execQueue {
  70. log.Trace("Alerting: engine:execDispatch() starting job %s", job.Rule.Name)
  71. job.Running = true
  72. e.executeJob(job)
  73. }
  74. }
  75. func (e *Engine) executeJob(job *AlertJob) {
  76. startTime := time.Now()
  77. resultChan := make(chan *AlertResultContext, 1)
  78. go e.handler.Execute(job.Rule, resultChan)
  79. select {
  80. case <-time.After(e.alertJobTimeout):
  81. e.resultQueue <- &AlertResultContext{
  82. Error: fmt.Errorf("Timeout"),
  83. Rule: job.Rule,
  84. StartTime: startTime,
  85. EndTime: time.Now(),
  86. }
  87. close(resultChan)
  88. e.log.Debug("Job Execution timeout", "alertRuleId", job.Rule.Id)
  89. case result := <-resultChan:
  90. duration := float64(result.EndTime.Nanosecond()-result.StartTime.Nanosecond()) / float64(1000000)
  91. e.log.Debug("Job Execution done", "timeTakenMs", duration, "ruleId", job.Rule.Id)
  92. e.resultQueue <- result
  93. }
  94. job.Running = false
  95. }
  96. func (e *Engine) resultHandler() {
  97. defer func() {
  98. if err := recover(); err != nil {
  99. e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1))
  100. }
  101. }()
  102. for result := range e.resultQueue {
  103. e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "triggered", result.Triggered)
  104. if result.Error != nil {
  105. e.log.Error("Alert Rule Result Error", "ruleId", result.Rule.Id, "error", result.Error, "retry")
  106. } else {
  107. e.responseHandler.Handle(result)
  108. }
  109. }
  110. }