engine.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package alerting
  2. import (
  3. "context"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/log"
  7. "golang.org/x/sync/errgroup"
  8. )
  9. type Engine struct {
  10. execQueue chan *Job
  11. clock clock.Clock
  12. ticker *Ticker
  13. scheduler Scheduler
  14. evalHandler EvalHandler
  15. ruleReader RuleReader
  16. log log.Logger
  17. resultHandler ResultHandler
  18. }
  19. func NewEngine() *Engine {
  20. e := &Engine{
  21. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  22. execQueue: make(chan *Job, 1000),
  23. scheduler: NewScheduler(),
  24. evalHandler: NewEvalHandler(),
  25. ruleReader: NewRuleReader(),
  26. log: log.New("alerting.engine"),
  27. resultHandler: NewResultHandler(),
  28. }
  29. return e
  30. }
  31. func (e *Engine) Run(ctx context.Context) error {
  32. e.log.Info("Initializing Alerting")
  33. alertGroup, ctx := errgroup.WithContext(ctx)
  34. alertGroup.Go(func() error { return e.alertingTicker(ctx) })
  35. alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })
  36. err := alertGroup.Wait()
  37. e.log.Info("Stopped Alerting", "reason", err)
  38. return err
  39. }
  40. func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
  41. defer func() {
  42. if err := recover(); err != nil {
  43. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  44. }
  45. }()
  46. tickIndex := 0
  47. for {
  48. select {
  49. case <-grafanaCtx.Done():
  50. return grafanaCtx.Err()
  51. case tick := <-e.ticker.C:
  52. // TEMP SOLUTION update rules ever tenth tick
  53. if tickIndex%10 == 0 {
  54. e.scheduler.Update(e.ruleReader.Fetch())
  55. }
  56. e.scheduler.Tick(tick, e.execQueue)
  57. tickIndex++
  58. }
  59. }
  60. }
  61. func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error {
  62. dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx)
  63. for {
  64. select {
  65. case <-grafanaCtx.Done():
  66. return dispatcherGroup.Wait()
  67. case job := <-e.execQueue:
  68. dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) })
  69. }
  70. }
  71. }
  72. var (
  73. unfinishedWorkTimeout time.Duration = time.Second * 5
  74. alertTimeout time.Duration = time.Second * 30
  75. )
  76. func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
  77. defer func() {
  78. if err := recover(); err != nil {
  79. e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
  80. }
  81. }()
  82. alertCtx, cancelFn := context.WithTimeout(context.TODO(), alertTimeout)
  83. job.Running = true
  84. evalContext := NewEvalContext(alertCtx, job.Rule)
  85. done := make(chan struct{})
  86. go func() {
  87. e.evalHandler.Eval(evalContext)
  88. e.resultHandler.Handle(evalContext)
  89. close(done)
  90. }()
  91. var err error = nil
  92. select {
  93. case <-grafanaCtx.Done():
  94. select {
  95. case <-time.After(unfinishedWorkTimeout):
  96. cancelFn()
  97. err = grafanaCtx.Err()
  98. case <-done:
  99. }
  100. case <-done:
  101. }
  102. e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing)
  103. job.Running = false
  104. cancelFn()
  105. return err
  106. }