scheduler.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package alerting
  2. import (
  3. "math"
  4. "time"
  5. "github.com/grafana/grafana/pkg/infra/log"
  6. "github.com/grafana/grafana/pkg/models"
  7. )
  8. type schedulerImpl struct {
  9. jobs map[int64]*Job
  10. log log.Logger
  11. }
  12. func newScheduler() scheduler {
  13. return &schedulerImpl{
  14. jobs: make(map[int64]*Job),
  15. log: log.New("alerting.scheduler"),
  16. }
  17. }
  18. func (s *schedulerImpl) Update(rules []*Rule) {
  19. s.log.Debug("Scheduling update", "ruleCount", len(rules))
  20. jobs := make(map[int64]*Job)
  21. for i, rule := range rules {
  22. var job *Job
  23. if s.jobs[rule.ID] != nil {
  24. job = s.jobs[rule.ID]
  25. } else {
  26. job = &Job{}
  27. job.SetRunning(false)
  28. }
  29. job.Rule = rule
  30. offset := ((rule.Frequency * 1000) / int64(len(rules))) * int64(i)
  31. job.Offset = int64(math.Floor(float64(offset) / 1000))
  32. if job.Offset == 0 { //zero offset causes division with 0 panics.
  33. job.Offset = 1
  34. }
  35. jobs[rule.ID] = job
  36. }
  37. s.jobs = jobs
  38. }
  39. func (s *schedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
  40. now := tickTime.Unix()
  41. for _, job := range s.jobs {
  42. if job.GetRunning() || job.Rule.State == models.AlertStatePaused {
  43. continue
  44. }
  45. if job.OffsetWait && now%job.Offset == 0 {
  46. job.OffsetWait = false
  47. s.enqueue(job, execQueue)
  48. continue
  49. }
  50. if now%job.Rule.Frequency == 0 {
  51. if job.Offset > 0 {
  52. job.OffsetWait = true
  53. } else {
  54. s.enqueue(job, execQueue)
  55. }
  56. }
  57. }
  58. }
  59. func (s *schedulerImpl) enqueue(job *Job, execQueue chan *Job) {
  60. s.log.Debug("Scheduler: Putting job on to exec queue", "name", job.Rule.Name, "id", job.Rule.ID)
  61. execQueue <- job
  62. }