scheduler.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package alerting
  2. import (
  3. "math"
  4. "time"
  5. "github.com/grafana/grafana/pkg/log"
  6. "github.com/grafana/grafana/pkg/models"
  7. )
  8. type SchedulerImpl struct {
  9. jobs map[int64]*Job
  10. log log.Logger
  11. }
  12. func NewScheduler() Scheduler {
  13. return &SchedulerImpl{
  14. jobs: make(map[int64]*Job, 0),
  15. log: log.New("alerting.scheduler"),
  16. }
  17. }
  18. func (s *SchedulerImpl) Update(rules []*Rule) {
  19. s.log.Debug("Scheduling update", "ruleCount", len(rules))
  20. jobs := make(map[int64]*Job, 0)
  21. for i, rule := range rules {
  22. var job *Job
  23. if s.jobs[rule.Id] != nil {
  24. job = s.jobs[rule.Id]
  25. } else {
  26. job = &Job{
  27. Running: false,
  28. }
  29. }
  30. job.Rule = rule
  31. offset := ((rule.Frequency * 1000) / int64(len(rules))) * int64(i)
  32. job.Offset = int64(math.Floor(float64(offset) / 1000))
  33. if job.Offset == 0 { //zero offset causes division with 0 panics.
  34. job.Offset = 1
  35. }
  36. jobs[rule.Id] = job
  37. }
  38. s.jobs = jobs
  39. }
  40. func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
  41. now := tickTime.Unix()
  42. for _, job := range s.jobs {
  43. if job.Running || job.Rule.State == models.AlertStatePaused {
  44. continue
  45. }
  46. if job.OffsetWait && now%job.Offset == 0 {
  47. job.OffsetWait = false
  48. s.enqueue(job, execQueue)
  49. continue
  50. }
  51. if now%job.Rule.Frequency == 0 {
  52. if job.Offset > 0 {
  53. job.OffsetWait = true
  54. } else {
  55. s.enqueue(job, execQueue)
  56. }
  57. }
  58. }
  59. }
  60. func (s *SchedulerImpl) enqueue(job *Job, execQueue chan *Job) {
  61. s.log.Debug("Scheduler: Putting job on to exec queue", "name", job.Rule.Name, "id", job.Rule.Id)
  62. execQueue <- job
  63. }