engine.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package alerting
  2. import (
  3. "context"
  4. "time"
  5. "github.com/benbjohnson/clock"
  6. "github.com/grafana/grafana/pkg/log"
  7. "golang.org/x/sync/errgroup"
  8. )
  9. type Engine struct {
  10. execQueue chan *Job
  11. resultQueue chan *EvalContext
  12. clock clock.Clock
  13. ticker *Ticker
  14. scheduler Scheduler
  15. evalHandler EvalHandler
  16. ruleReader RuleReader
  17. log log.Logger
  18. resultHandler ResultHandler
  19. }
  20. func NewEngine() *Engine {
  21. e := &Engine{
  22. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  23. execQueue: make(chan *Job, 1000),
  24. resultQueue: make(chan *EvalContext, 1000),
  25. scheduler: NewScheduler(),
  26. evalHandler: NewEvalHandler(),
  27. ruleReader: NewRuleReader(),
  28. log: log.New("alerting.engine"),
  29. resultHandler: NewResultHandler(),
  30. }
  31. return e
  32. }
  33. func (e *Engine) Run(ctx context.Context) error {
  34. e.log.Info("Initializing Alerting")
  35. g, ctx := errgroup.WithContext(ctx)
  36. g.Go(func() error { return e.alertingTicker(ctx) })
  37. g.Go(func() error { return e.execDispatcher(ctx) })
  38. g.Go(func() error { return e.resultDispatcher(ctx) })
  39. err := g.Wait()
  40. e.log.Info("Stopped Alerting", "reason", err)
  41. return err
  42. }
  43. func (e *Engine) Stop() {
  44. close(e.execQueue)
  45. close(e.resultQueue)
  46. }
  47. func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
  48. defer func() {
  49. if err := recover(); err != nil {
  50. e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
  51. }
  52. }()
  53. tickIndex := 0
  54. for {
  55. select {
  56. case <-grafanaCtx.Done():
  57. return grafanaCtx.Err()
  58. case tick := <-e.ticker.C:
  59. // TEMP SOLUTION update rules ever tenth tick
  60. if tickIndex%10 == 0 {
  61. e.scheduler.Update(e.ruleReader.Fetch())
  62. }
  63. e.scheduler.Tick(tick, e.execQueue)
  64. tickIndex++
  65. }
  66. }
  67. }
  68. func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
  69. for {
  70. select {
  71. case <-grafanaCtx.Done():
  72. close(e.resultQueue)
  73. return grafanaCtx.Err()
  74. case job := <-e.execQueue:
  75. go e.executeJob(grafanaCtx, job)
  76. }
  77. }
  78. }
  79. func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error {
  80. defer func() {
  81. if err := recover(); err != nil {
  82. e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
  83. }
  84. }()
  85. done := make(chan *EvalContext, 1)
  86. go func() {
  87. job.Running = true
  88. context := NewEvalContext(job.Rule)
  89. e.evalHandler.Eval(context)
  90. job.Running = false
  91. done <- context
  92. close(done)
  93. }()
  94. select {
  95. case <-grafanaCtx.Done():
  96. return grafanaCtx.Err()
  97. case evalContext := <-done:
  98. e.resultQueue <- evalContext
  99. }
  100. return nil
  101. }
  102. func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
  103. for {
  104. select {
  105. case <-grafanaCtx.Done():
  106. //handle all responses before shutting down.
  107. for result := range e.resultQueue {
  108. e.handleResponse(result)
  109. }
  110. return grafanaCtx.Err()
  111. case result := <-e.resultQueue:
  112. e.handleResponse(result)
  113. }
  114. }
  115. }
  116. func (e *Engine) handleResponse(result *EvalContext) {
  117. defer func() {
  118. if err := recover(); err != nil {
  119. e.log.Error("Panic in resultDispatcher", "error", err, "stack", log.Stack(1))
  120. }
  121. }()
  122. e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "firing", result.Firing)
  123. e.resultHandler.Handle(result)
  124. }