scheduler.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package alerting
  2. import (
  3. "time"
  4. "github.com/grafana/grafana/pkg/log"
  5. )
  6. type SchedulerImpl struct {
  7. jobs map[int64]*AlertJob
  8. }
  9. func NewScheduler() Scheduler {
  10. return &SchedulerImpl{
  11. jobs: make(map[int64]*AlertJob, 0),
  12. }
  13. }
  14. func (scheduler *SchedulerImpl) Update(rules []*AlertRule) {
  15. log.Debug("Scheduler: Update()")
  16. jobs := make(map[int64]*AlertJob, 0)
  17. for i, rule := range rules {
  18. var job *AlertJob
  19. if scheduler.jobs[rule.Id] != nil {
  20. job = scheduler.jobs[rule.Id]
  21. } else {
  22. job = &AlertJob{
  23. Running: false,
  24. RetryCount: 0,
  25. }
  26. }
  27. job.Rule = rule
  28. job.Offset = int64(i)
  29. jobs[rule.Id] = job
  30. }
  31. log.Debug("Scheduler: Selected %d jobs", len(jobs))
  32. scheduler.jobs = jobs
  33. }
  34. func (scheduler *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *AlertJob) {
  35. now := tickTime.Unix()
  36. for _, job := range scheduler.jobs {
  37. if now%job.Rule.Frequency == 0 && job.Running == false {
  38. log.Trace("Scheduler: Putting job on to exec queue: %s", job.Rule.Title)
  39. execQueue <- job
  40. }
  41. }
  42. }
  43. // func (scheduler *Scheduler) handleResponses() {
  44. // for response := range scheduler.responseQueue {
  45. // log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount)
  46. // response.AlertJob.Running = false
  47. //
  48. // if response.IsResultIncomplete() {
  49. // response.AlertJob.RetryCount++
  50. // if response.AlertJob.RetryCount < maxRetries {
  51. // scheduler.runQueue <- response.AlertJob
  52. // } else {
  53. // saveState(&AlertResult{
  54. // Id: response.Id,
  55. // State: alertstates.Critical,
  56. // Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
  57. // })
  58. // }
  59. // } else {
  60. // response.AlertJob.RetryCount = 0
  61. // saveState(response)
  62. // }
  63. // }
  64. // }
  65. //
  66. // func (scheduler *Scheduler) measureAndExecute(exec Executor, job *AlertJob) {
  67. // now := time.Now()
  68. //
  69. // responseChan := make(chan *AlertResult, 1)
  70. // go exec.Execute(job, responseChan)
  71. //
  72. // select {
  73. // case <-time.After(time.Second * 5):
  74. // scheduler.responseQueue <- &AlertResult{
  75. // Id: job.Rule.Id,
  76. // State: alertstates.Pending,
  77. // Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
  78. // AlertJob: job,
  79. // }
  80. // case result := <-responseChan:
  81. // result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
  82. // log.Info("Schedular: exeuction took %vms", result.Duration)
  83. // scheduler.responseQueue <- result
  84. // }
  85. // }