engine.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package alerting
  2. import (
  3. "time"
  4. "github.com/benbjohnson/clock"
  5. "github.com/grafana/grafana/pkg/log"
  6. )
  7. type Engine struct {
  8. execQueue chan *AlertJob
  9. resultQueue chan *AlertResultContext
  10. clock clock.Clock
  11. ticker *Ticker
  12. scheduler Scheduler
  13. handler AlertHandler
  14. ruleReader RuleReader
  15. log log.Logger
  16. responseHandler ResultHandler
  17. }
  18. func NewEngine() *Engine {
  19. e := &Engine{
  20. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  21. execQueue: make(chan *AlertJob, 1000),
  22. resultQueue: make(chan *AlertResultContext, 1000),
  23. scheduler: NewScheduler(),
  24. handler: NewHandler(),
  25. ruleReader: NewRuleReader(),
  26. log: log.New("alerting.engine"),
  27. responseHandler: NewResultHandler(),
  28. }
  29. return e
  30. }
  31. func (e *Engine) Start() {
  32. e.log.Info("Starting Alerting Engine")
  33. go e.alertingTicker()
  34. go e.execDispatch()
  35. go e.resultHandler()
  36. }
  37. func (e *Engine) Stop() {
  38. close(e.execQueue)
  39. close(e.resultQueue)
  40. }
  41. func (e *Engine) alertingTicker() {
  42. defer func() {
  43. if err := recover(); err != nil {
  44. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  45. }
  46. }()
  47. tickIndex := 0
  48. for {
  49. select {
  50. case tick := <-e.ticker.C:
  51. // TEMP SOLUTION update rules ever tenth tick
  52. if tickIndex%10 == 0 {
  53. e.scheduler.Update(e.ruleReader.Fetch())
  54. }
  55. e.scheduler.Tick(tick, e.execQueue)
  56. tickIndex++
  57. }
  58. }
  59. }
  60. func (e *Engine) execDispatch() {
  61. defer func() {
  62. if err := recover(); err != nil {
  63. e.log.Error("Scheduler Panic: stopping executor", "error", err, "stack", log.Stack(1))
  64. }
  65. }()
  66. for job := range e.execQueue {
  67. log.Trace("Alerting: engine:execDispatch() starting job %s", job.Rule.Name)
  68. e.executeJob(job)
  69. }
  70. }
  71. func (e *Engine) executeJob(job *AlertJob) {
  72. job.Running = true
  73. context := NewAlertResultContext(job.Rule)
  74. e.handler.Execute(context)
  75. job.Running = false
  76. }
  77. func (e *Engine) resultHandler() {
  78. defer func() {
  79. if err := recover(); err != nil {
  80. e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1))
  81. }
  82. }()
  83. for result := range e.resultQueue {
  84. e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "triggered", result.Triggered)
  85. if result.Error != nil {
  86. e.log.Error("Alert Rule Result Error", "ruleId", result.Rule.Id, "error", result.Error, "retry")
  87. } else {
  88. e.responseHandler.Handle(result)
  89. }
  90. }
  91. }