engine.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package alerting
  2. import (
  3. "context"
  4. "time"
  5. "github.com/opentracing/opentracing-go"
  6. tlog "github.com/opentracing/opentracing-go/log"
  7. "github.com/benbjohnson/clock"
  8. "github.com/grafana/grafana/pkg/log"
  9. "golang.org/x/sync/errgroup"
  10. )
  11. type Engine struct {
  12. execQueue chan *Job
  13. clock clock.Clock
  14. ticker *Ticker
  15. scheduler Scheduler
  16. evalHandler EvalHandler
  17. ruleReader RuleReader
  18. log log.Logger
  19. resultHandler ResultHandler
  20. }
  21. func NewEngine() *Engine {
  22. e := &Engine{
  23. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  24. execQueue: make(chan *Job, 1000),
  25. scheduler: NewScheduler(),
  26. evalHandler: NewEvalHandler(),
  27. ruleReader: NewRuleReader(),
  28. log: log.New("alerting.engine"),
  29. resultHandler: NewResultHandler(),
  30. }
  31. return e
  32. }
  33. func (e *Engine) Run(ctx context.Context) error {
  34. e.log.Info("Initializing Alerting")
  35. alertGroup, ctx := errgroup.WithContext(ctx)
  36. alertGroup.Go(func() error { return e.alertingTicker(ctx) })
  37. alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })
  38. err := alertGroup.Wait()
  39. e.log.Info("Stopped Alerting", "reason", err)
  40. return err
  41. }
  42. func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
  43. defer func() {
  44. if err := recover(); err != nil {
  45. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  46. }
  47. }()
  48. tickIndex := 0
  49. for {
  50. select {
  51. case <-grafanaCtx.Done():
  52. return grafanaCtx.Err()
  53. case tick := <-e.ticker.C:
  54. // TEMP SOLUTION update rules ever tenth tick
  55. if tickIndex%10 == 0 {
  56. e.scheduler.Update(e.ruleReader.Fetch())
  57. }
  58. e.scheduler.Tick(tick, e.execQueue)
  59. tickIndex++
  60. }
  61. }
  62. }
  63. func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error {
  64. dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx)
  65. for {
  66. select {
  67. case <-grafanaCtx.Done():
  68. return dispatcherGroup.Wait()
  69. case job := <-e.execQueue:
  70. dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) })
  71. }
  72. }
  73. }
  74. var (
  75. unfinishedWorkTimeout time.Duration = time.Second * 5
  76. alertTimeout time.Duration = time.Second * 30
  77. )
  78. func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
  79. defer func() {
  80. if err := recover(); err != nil {
  81. e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
  82. }
  83. }()
  84. alertCtx, cancelFn := context.WithTimeout(context.Background(), alertTimeout)
  85. job.Running = true
  86. evalContext := NewEvalContext(alertCtx, job.Rule)
  87. done := make(chan struct{})
  88. span := opentracing.StartSpan("alerting")
  89. alertCtx = opentracing.ContextWithSpan(alertCtx, span)
  90. evalContext.Ctx = alertCtx
  91. go func() {
  92. defer func() {
  93. if err := recover(); err != nil {
  94. e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
  95. close(done)
  96. }
  97. }()
  98. e.evalHandler.Eval(evalContext)
  99. e.resultHandler.Handle(evalContext)
  100. span.LogFields(
  101. tlog.Int64("alertId", evalContext.Rule.Id),
  102. tlog.Int64("dashboardId", evalContext.Rule.DashboardId),
  103. tlog.Bool("firing", evalContext.Firing),
  104. )
  105. close(done)
  106. span.Finish()
  107. }()
  108. var err error = nil
  109. select {
  110. case <-grafanaCtx.Done():
  111. select {
  112. case <-time.After(unfinishedWorkTimeout):
  113. cancelFn()
  114. err = grafanaCtx.Err()
  115. case <-done:
  116. }
  117. case <-done:
  118. }
  119. e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing)
  120. job.Running = false
  121. cancelFn()
  122. return err
  123. }