scheduler.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package alerting
  2. import (
  3. "math"
  4. "time"
  5. "github.com/grafana/grafana/pkg/log"
  6. "github.com/grafana/grafana/pkg/models"
  7. )
  8. type SchedulerImpl struct {
  9. jobs map[int64]*Job
  10. log log.Logger
  11. }
  12. func NewScheduler() Scheduler {
  13. return &SchedulerImpl{
  14. jobs: make(map[int64]*Job, 0),
  15. log: log.New("alerting.scheduler"),
  16. }
  17. }
  18. func (s *SchedulerImpl) Update(rules []*Rule) {
  19. s.log.Debug("Scheduling update", "ruleCount", len(rules))
  20. jobs := make(map[int64]*Job, 0)
  21. for i, rule := range rules {
  22. var job *Job
  23. if s.jobs[rule.Id] != nil {
  24. job = s.jobs[rule.Id]
  25. } else {
  26. job = &Job{
  27. Running: false,
  28. }
  29. }
  30. job.Rule = rule
  31. offset := ((rule.Frequency * 1000) / int64(len(rules))) * int64(i)
  32. job.Offset = int64(math.Floor(float64(offset) / 1000))
  33. jobs[rule.Id] = job
  34. }
  35. s.jobs = jobs
  36. }
  37. func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
  38. now := tickTime.Unix()
  39. for _, job := range s.jobs {
  40. if job.Running || job.Rule.State == models.AlertStatePaused {
  41. continue
  42. }
  43. if job.OffsetWait && now%job.Offset == 0 {
  44. job.OffsetWait = false
  45. s.enque(job, execQueue)
  46. continue
  47. }
  48. if now%job.Rule.Frequency == 0 {
  49. if job.Offset > 0 {
  50. job.OffsetWait = true
  51. } else {
  52. s.enque(job, execQueue)
  53. }
  54. }
  55. }
  56. }
  57. func (s *SchedulerImpl) enque(job *Job, execQueue chan *Job) {
  58. s.log.Debug("Scheduler: Putting job on to exec queue", "name", job.Rule.Name, "id", job.Rule.Id)
  59. execQueue <- job
  60. }