alerting.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package alerting
  2. import (
  3. "time"
  4. "github.com/grafana/grafana/pkg/log"
  5. m "github.com/grafana/grafana/pkg/models"
  6. "github.com/grafana/grafana/pkg/setting"
  7. )
  8. func Init() {
  9. if !setting.AlertingEnabled {
  10. return
  11. }
  12. log.Info("Alerting: Initializing scheduler...")
  13. scheduler := NewScheduler()
  14. go scheduler.Dispatch()
  15. go scheduler.Executor()
  16. }
  17. type Scheduler struct {
  18. jobs []*AlertJob
  19. runQueue chan *AlertJob
  20. }
  21. func NewScheduler() *Scheduler {
  22. return &Scheduler{
  23. jobs: make([]*AlertJob, 0),
  24. runQueue: make(chan *AlertJob, 1000),
  25. }
  26. }
  27. func (s *Scheduler) Dispatch() {
  28. reschedule := time.NewTicker(time.Second * 10)
  29. secondTicker := time.NewTicker(time.Second)
  30. s.updateJobs()
  31. for {
  32. select {
  33. case <-secondTicker.C:
  34. s.queueJobs()
  35. case <-reschedule.C:
  36. s.updateJobs()
  37. }
  38. }
  39. }
  40. func (s *Scheduler) updateJobs() {
  41. log.Info("Scheduler:updateJobs()")
  42. jobs := make([]*AlertJob, 0)
  43. jobs = append(jobs, &AlertJob{
  44. name: "ID_1_Each 10s",
  45. frequency: 10,
  46. offset: 1,
  47. })
  48. jobs = append(jobs, &AlertJob{
  49. name: "ID_2_Each 10s",
  50. frequency: 10,
  51. offset: 2,
  52. })
  53. jobs = append(jobs, &AlertJob{
  54. name: "ID_3_Each 10s",
  55. frequency: 10,
  56. offset: 3,
  57. })
  58. jobs = append(jobs, &AlertJob{
  59. name: "ID_4_Each 5s",
  60. frequency: 5,
  61. })
  62. s.jobs = jobs
  63. }
  64. func (s *Scheduler) queueJobs() {
  65. log.Info("Scheduler:queueJobs()")
  66. now := time.Now().Unix()
  67. for _, job := range s.jobs {
  68. if now%job.frequency == 0 {
  69. log.Info("Scheduler: Putting job on to run queue: %s", job.name)
  70. s.runQueue <- job
  71. }
  72. }
  73. }
  74. func (s *Scheduler) Executor() {
  75. for job := range s.runQueue {
  76. log.Info("Executor: queue length %d", len(s.runQueue))
  77. log.Info("Executor: executing %s", job.name)
  78. time.Sleep(1000)
  79. }
  80. }
  81. type AlertJob struct {
  82. id int64
  83. name string
  84. frequency int64
  85. offset int64
  86. delay bool
  87. }
  88. type RuleReader interface {
  89. }
  90. type Executor interface {
  91. Execute(rule *m.AlertRule)
  92. }