Просмотр исходного кода

feat(alerting): only start unfinnished jobs

bergquist 9 лет назад
Родитель
Сommit
9f8c67e352

+ 0 - 0
pkg/services/alerting/alert_rule_reader.gi.go → pkg/services/alerting/alert_rule_reader.go


+ 36 - 21
pkg/services/alerting/alerting.go

@@ -8,6 +8,7 @@ import (
 	"github.com/grafana/grafana/pkg/log"
 	m "github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/setting"
+	"sync"
 )
 
 func Init() {
@@ -25,6 +26,7 @@ func Init() {
 type Scheduler struct {
 	jobs     []*AlertJob
 	runQueue chan *AlertJob
+	mtx      sync.RWMutex
 
 	alertRuleFetcher RuleReader
 
@@ -41,42 +43,45 @@ func NewScheduler() *Scheduler {
 	}
 }
 
-func (s *Scheduler) heartBeat() {
+func (this *Scheduler) heartBeat() {
 	//write heartBeat to db.
 	//get the modulus position of active servers
 
-	log.Info("Heartbeat: Sending heartbeat from " + s.serverId)
-	s.clusterSize = 1
-	s.serverPosition = 1
+	log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
+	this.clusterSize = 1
+	this.serverPosition = 1
 }
 
-func (s *Scheduler) Dispatch(reader RuleReader) {
+func (this *Scheduler) Dispatch(reader RuleReader) {
 	reschedule := time.NewTicker(time.Second * 10)
 	secondTicker := time.NewTicker(time.Second)
 	heartbeat := time.NewTicker(time.Second * 5)
 
-	s.heartBeat()
-	s.updateJobs(reader.Fetch)
+	this.heartBeat()
+	this.updateJobs(reader.Fetch)
 
 	for {
 		select {
 		case <-secondTicker.C:
-			s.queueJobs()
+			this.queueJobs()
 		case <-reschedule.C:
-			s.updateJobs(reader.Fetch)
+			this.updateJobs(reader.Fetch)
 		case <-heartbeat.C:
-			s.heartBeat()
+			this.heartBeat()
 		}
 	}
 }
 
-func (s *Scheduler) updateJobs(f func() []m.AlertRule) {
+func (this *Scheduler) updateJobs(f func() []m.AlertRule) {
 	log.Debug("Scheduler: UpdateJobs()")
 
 	jobs := make([]*AlertJob, 0)
 	rules := f()
 
-	for i := s.serverPosition - 1; i < len(rules); i += s.clusterSize {
+	this.mtx.Lock()
+	defer this.mtx.Unlock()
+
+	for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
 		rule := rules[i]
 		jobs = append(jobs, &AlertJob{
 			id:        rule.Id,
@@ -89,34 +94,44 @@ func (s *Scheduler) updateJobs(f func() []m.AlertRule) {
 
 	log.Debug("Scheduler: Selected %d jobs", len(jobs))
 
-	s.jobs = jobs
+	this.jobs = jobs
 }
 
-func (s *Scheduler) queueJobs() {
+func (this *Scheduler) queueJobs() {
 	now := time.Now().Unix()
 
-	for _, job := range s.jobs {
-		if now%job.frequency == 0 {
+	for _, job := range this.jobs {
+		if now%job.frequency == 0 && job.running == false {
 			log.Info("Scheduler: Putting job on to run queue: %s", job.name)
-			s.runQueue <- job
+			this.runQueue <- job
 		}
 	}
 }
 
-func (s *Scheduler) Executor(executor Executor) {
-	for job := range s.runQueue {
-		log.Info("Executor: queue length %d", len(s.runQueue))
+func (this *Scheduler) Executor(executor Executor) {
+	for job := range this.runQueue {
+		log.Info("Executor: queue length %d", len(this.runQueue))
 		log.Info("Executor: executing %s", job.name)
-		go executor.Execute(job.rule)
+		go Measure(executor, job)
 	}
 }
 
+func Measure(exec Executor, rule *AlertJob) {
+	now := time.Now()
+	rule.running = true
+	exec.Execute(rule.rule)
+	rule.running = true
+	elapsed := time.Since(now)
+	log.Info("Schedular: exeuction took %v milli seconds", elapsed.Nanoseconds()/1000000)
+}
+
 type AlertJob struct {
 	id        int64
 	name      string
 	frequency int64
 	offset    int64
 	delay     bool
+	running   bool
 	rule      m.AlertRule
 }
 

+ 19 - 0
pkg/services/alerting/alerting_test.go

@@ -59,5 +59,24 @@ func TestAlertingScheduler(t *testing.T) {
 			So(len(scheduler.jobs), ShouldEqual, 1)
 			So(scheduler.jobs[0].id, ShouldEqual, 6)
 		})
+
+		Convey("more servers then alerts", func() {
+			mockFn := func() []m.AlertRule {
+				return []m.AlertRule{
+					{Id: 1, Title: "test 1"},
+				}
+			}
+
+			scheduler := &Scheduler{
+				jobs:           make([]*AlertJob, 0),
+				runQueue:       make(chan *AlertJob, 1000),
+				serverId:       "",
+				serverPosition: 3,
+				clusterSize:    3,
+			}
+
+			scheduler.updateJobs(mockFn)
+			So(len(scheduler.jobs), ShouldEqual, 0)
+		})
 	})
 }

+ 1 - 0
pkg/services/alerting/notifier.go

@@ -0,0 +1 @@
+package alerting