engine.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package alerting
  2. import (
  3. "time"
  4. "github.com/benbjohnson/clock"
  5. "github.com/grafana/grafana/pkg/log"
  6. )
  7. type Engine struct {
  8. execQueue chan *Job
  9. resultQueue chan *EvalContext
  10. clock clock.Clock
  11. ticker *Ticker
  12. scheduler Scheduler
  13. evalHandler EvalHandler
  14. ruleReader RuleReader
  15. log log.Logger
  16. resultHandler ResultHandler
  17. }
  18. func NewEngine() *Engine {
  19. e := &Engine{
  20. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  21. execQueue: make(chan *Job, 1000),
  22. resultQueue: make(chan *EvalContext, 1000),
  23. scheduler: NewScheduler(),
  24. evalHandler: NewEvalHandler(),
  25. ruleReader: NewRuleReader(),
  26. log: log.New("alerting.engine"),
  27. resultHandler: NewResultHandler(),
  28. }
  29. return e
  30. }
  31. func (e *Engine) Start() {
  32. e.log.Info("Starting Alerting Engine")
  33. go e.alertingTicker()
  34. go e.execDispatcher()
  35. go e.resultDispatcher()
  36. }
  37. func (e *Engine) Stop() {
  38. close(e.execQueue)
  39. close(e.resultQueue)
  40. }
  41. func (e *Engine) alertingTicker() {
  42. defer func() {
  43. if err := recover(); err != nil {
  44. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  45. }
  46. }()
  47. tickIndex := 0
  48. for {
  49. select {
  50. case tick := <-e.ticker.C:
  51. // TEMP SOLUTION update rules ever tenth tick
  52. if tickIndex%10 == 0 {
  53. e.scheduler.Update(e.ruleReader.Fetch())
  54. }
  55. e.scheduler.Tick(tick, e.execQueue)
  56. tickIndex++
  57. }
  58. }
  59. }
  60. func (e *Engine) execDispatcher() {
  61. for job := range e.execQueue {
  62. e.log.Debug("Starting executing alert rule", "alert id", job.Rule.Id)
  63. go e.executeJob(job)
  64. }
  65. }
  66. func (e *Engine) executeJob(job *Job) {
  67. defer func() {
  68. if err := recover(); err != nil {
  69. e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
  70. }
  71. }()
  72. job.Running = true
  73. context := NewEvalContext(job.Rule)
  74. e.evalHandler.Eval(context)
  75. job.Running = false
  76. e.resultQueue <- context
  77. }
  78. func (e *Engine) resultDispatcher() {
  79. for result := range e.resultQueue {
  80. go e.handleResponse(result)
  81. }
  82. }
  83. func (e *Engine) handleResponse(result *EvalContext) {
  84. defer func() {
  85. if err := recover(); err != nil {
  86. e.log.Error("Panic in resultDispatcher", "error", err, "stack", log.Stack(1))
  87. }
  88. }()
  89. e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "firing", result.Firing)
  90. e.resultHandler.Handle(result)
  91. }