scheduler.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package alerting
  2. import (
  3. "math"
  4. "time"
  5. "github.com/grafana/grafana/pkg/log"
  6. )
  7. type SchedulerImpl struct {
  8. jobs map[int64]*Job
  9. log log.Logger
  10. }
  11. func NewScheduler() Scheduler {
  12. return &SchedulerImpl{
  13. jobs: make(map[int64]*Job, 0),
  14. log: log.New("alerting.scheduler"),
  15. }
  16. }
  17. func (s *SchedulerImpl) Update(rules []*Rule) {
  18. s.log.Debug("Scheduling update", "ruleCount", len(rules))
  19. jobs := make(map[int64]*Job, 0)
  20. for i, rule := range rules {
  21. var job *Job
  22. if s.jobs[rule.Id] != nil {
  23. job = s.jobs[rule.Id]
  24. } else {
  25. job = &Job{
  26. Running: false,
  27. }
  28. }
  29. job.Rule = rule
  30. job.Offset = ((rule.Frequency * 1000) / int64(len(rules))) * int64(i)
  31. job.Offset = int64(math.Floor(float64(job.Offset) / 1000))
  32. jobs[rule.Id] = job
  33. }
  34. s.jobs = jobs
  35. }
  36. func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
  37. now := tickTime.Unix()
  38. for _, job := range s.jobs {
  39. if job.Running {
  40. continue
  41. }
  42. if job.OffsetWait && now%job.Offset == 0 {
  43. job.OffsetWait = false
  44. s.enque(job, execQueue)
  45. continue
  46. }
  47. if now%job.Rule.Frequency == 0 {
  48. if job.Offset > 0 {
  49. job.OffsetWait = true
  50. } else {
  51. s.enque(job, execQueue)
  52. }
  53. }
  54. }
  55. }
  56. func (s *SchedulerImpl) enque(job *Job, execQueue chan *Job) {
  57. s.log.Debug("Scheduler: Putting job on to exec queue", "name", job.Rule.Name, "id", job.Rule.Id)
  58. execQueue <- job
  59. }