| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package alerting
- import (
- "math/rand"
- "strconv"
- "time"
- "github.com/grafana/grafana/pkg/log"
- m "github.com/grafana/grafana/pkg/models"
- "github.com/grafana/grafana/pkg/setting"
- )
- func Init() {
- if !setting.AlertingEnabled {
- return
- }
- log.Info("Alerting: Initializing scheduler...")
- scheduler := NewScheduler()
- go scheduler.Dispatch(&AlertRuleReader{})
- go scheduler.Executor(&DummieExecutor{})
- }
- type Scheduler struct {
- jobs []*AlertJob
- runQueue chan *AlertJob
- alertRuleFetcher RuleReader
- serverId string
- serverPosition int
- clusterSize int
- }
- func NewScheduler() *Scheduler {
- return &Scheduler{
- jobs: make([]*AlertJob, 0),
- runQueue: make(chan *AlertJob, 1000),
- serverId: strconv.Itoa(rand.Intn(1000)),
- }
- }
- func (s *Scheduler) heartBeat() {
- //write heartBeat to db.
- //get the modulus position of active servers
- log.Info("Heartbeat: Sending heartbeat from " + s.serverId)
- s.clusterSize = 1
- s.serverPosition = 1
- }
- func (s *Scheduler) Dispatch(reader RuleReader) {
- reschedule := time.NewTicker(time.Second * 10)
- secondTicker := time.NewTicker(time.Second)
- ticker := time.NewTicker(time.Second * 5)
- s.heartBeat()
- s.updateJobs(reader)
- for {
- select {
- case <-secondTicker.C:
- s.queueJobs()
- case <-reschedule.C:
- s.updateJobs(reader)
- case <-ticker.C:
- s.heartBeat()
- }
- }
- }
- func (s *Scheduler) updateJobs(reader RuleReader) {
- log.Debug("Scheduler: UpdateJobs()")
- jobs := make([]*AlertJob, 0)
- rules := reader.Fetch()
- for i := s.serverPosition - 1; i < len(rules); i = i + s.clusterSize {
- rule := rules[i]
- jobs = append(jobs, &AlertJob{
- name: rule.Title,
- frequency: rule.Frequency,
- rule: rule,
- offset: int64(len(jobs)),
- })
- }
- log.Debug("Scheduler: Selected %d jobs", len(jobs))
- s.jobs = jobs
- }
- func (s *Scheduler) queueJobs() {
- now := time.Now().Unix()
- for _, job := range s.jobs {
- if now%job.frequency == 0 {
- log.Info("Scheduler: Putting job on to run queue: %s", job.name)
- s.runQueue <- job
- }
- }
- }
- func (s *Scheduler) Executor(executor Executor) {
- for job := range s.runQueue {
- log.Info("Executor: queue length %d", len(s.runQueue))
- log.Info("Executor: executing %s", job.name)
- go executor.Execute(job.rule)
- }
- }
- type AlertJob struct {
- id int64
- name string
- frequency int64
- offset int64
- delay bool
- rule m.AlertRule
- }
- type AlertResult struct {
- id int64
- state string
- duration time.Time
- }
- type RuleReader interface {
- Fetch() []m.AlertRule
- }
- type AlertRuleReader struct{}
- func (this AlertRuleReader) Fetch() []m.AlertRule {
- return []m.AlertRule{
- {Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
- {Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
- {Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
- {Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
- {Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
- {Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1},
- }
- }
- type Executor interface {
- Execute(rule m.AlertRule) (err error, result AlertResult)
- }
- type DummieExecutor struct{}
- func (this DummieExecutor) Execute(rule m.AlertRule) (err error, result AlertResult) {
- time.Sleep(1000)
- return nil, AlertResult{state: "OK", id: rule.Id}
- }
|