Просмотр исходного кода

feat(alerting): began work on splitting scheduler into scheduler and engine

Torkel Ödegaard 9 лет назад
Родитель
Сommit
0cbf4ae773

+ 11 - 6
pkg/services/alerting/alerting.go

@@ -11,19 +11,24 @@ var (
 	maxRetries = 3
 	maxRetries = 3
 )
 )
 
 
+var engine *Engine
+
 func Init() {
 func Init() {
 	if !setting.AlertingEnabled {
 	if !setting.AlertingEnabled {
 		return
 		return
 	}
 	}
 
 
-	log.Info("Alerting: Initializing scheduler...")
+	log.Info("Alerting: Initializing alerting engine...")
 
 
-	scheduler := NewScheduler()
-	reader := NewRuleReader()
+	engine = NewEngine()
+	engine.Start()
 
 
-	go scheduler.dispatch(reader)
-	go scheduler.executor(&ExecutorImpl{})
-	go scheduler.handleResponses()
+	// scheduler := NewScheduler()
+	// reader := NewRuleReader()
+	//
+	// go scheduler.dispatch(reader)
+	// go scheduler.executor(&ExecutorImpl{})
+	// go scheduler.handleResponses()
 }
 }
 
 
 func saveState(response *AlertResult) {
 func saveState(response *AlertResult) {

+ 52 - 0
pkg/services/alerting/engine.go

@@ -1 +1,53 @@
 package alerting
 package alerting
+
+import (
+	"time"
+
+	"github.com/Unknwon/log"
+	"github.com/benbjohnson/clock"
+)
+
+type Engine struct {
+	execQueue   chan *AlertJob
+	resultQueue chan *AlertResult
+	clock       clock.Clock
+	ticker      *Ticker
+	scheduler   Scheduler
+}
+
+func NewEngine() *Engine {
+	e := &Engine{
+		ticker:      NewTicker(time.Now(), time.Second*0, clock.New()),
+		execQueue:   make(chan *AlertJob, 1000),
+		resultQueue: make(chan *AlertResult, 1000),
+		scheduler:   NewScheduler(),
+	}
+
+	return e
+}
+
+func (e *Engine) Start() {
+	go e.schedulerTick()
+	go e.execDispatch()
+}
+
+func (e *Engine) Stop() {
+	close(e.execQueue)
+}
+
+func (e *Engine) schedulerTick() {
+	for {
+		select {
+		case tick := <-e.ticker.C:
+			e.scheduler.Tick(tick, e.execQueue)
+		}
+	}
+}
+
+func (e *Engine) execDispatch() {
+	for job := range e.execQueue {
+		log.Info("AlertEngine: Dispatching alert job %s", job.Rule.Title)
+		job.Running = true
+		//scheduler.measureAndExecute(executor, job)
+	}
+}

+ 1 - 5
pkg/services/alerting/executor.go

@@ -12,10 +12,6 @@ import (
 	"github.com/grafana/grafana/pkg/tsdb"
 	"github.com/grafana/grafana/pkg/tsdb"
 )
 )
 
 
-type Executor interface {
-	Execute(rule *AlertJob, responseQueue chan *AlertResult)
-}
-
 var (
 var (
 	resultLogFmt   = "%s executor: %s  %1.2f %s %1.2f : %v"
 	resultLogFmt   = "%s executor: %s  %1.2f %s %1.2f : %v"
 	descriptionFmt = "Actual value: %1.2f for %s"
 	descriptionFmt = "Actual value: %1.2f for %s"
@@ -111,7 +107,7 @@ func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, er
 	return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type)
 	return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type)
 }
 }
 
 
-func (executor *ExecutorImpl) validateRule(rule AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
+func (executor *ExecutorImpl) validateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
 	for _, serie := range series {
 	for _, serie := range series {
 		if aggregator[rule.Aggregator] == nil {
 		if aggregator[rule.Aggregator] == nil {
 			continue
 			continue

+ 12 - 0
pkg/services/alerting/interfaces.go

@@ -0,0 +1,12 @@
+package alerting
+
+import "time"
+
+type Executor interface {
+	Execute(rule *AlertJob, resultChan chan *AlertResult)
+}
+
+type Scheduler interface {
+	Tick(time time.Time, execQueue chan *AlertJob)
+	Update(rules []*AlertRule)
+}

+ 1 - 1
pkg/services/alerting/models.go

@@ -7,7 +7,7 @@ type AlertJob struct {
 	Delay      bool
 	Delay      bool
 	Running    bool
 	Running    bool
 	RetryCount int
 	RetryCount int
-	Rule       AlertRule
+	Rule       *AlertRule
 }
 }
 
 
 type AlertResult struct {
 type AlertResult struct {

+ 56 - 87
pkg/services/alerting/scheduler.go

@@ -1,48 +1,25 @@
 package alerting
 package alerting
 
 
 import (
 import (
-	"fmt"
 	"time"
 	"time"
 
 
-	"github.com/Unknwon/log"
-	"github.com/grafana/grafana/pkg/services/alerting/alertstates"
+	"github.com/grafana/grafana/pkg/log"
 )
 )
 
 
-type Scheduler struct {
-	jobs          map[int64]*AlertJob
-	runQueue      chan *AlertJob
-	responseQueue chan *AlertResult
+type SchedulerImpl struct {
+	jobs map[int64]*AlertJob
 }
 }
 
 
-func NewScheduler() *Scheduler {
-	return &Scheduler{
-		jobs:          make(map[int64]*AlertJob, 0),
-		runQueue:      make(chan *AlertJob, 1000),
-		responseQueue: make(chan *AlertResult, 1000),
+func NewScheduler() Scheduler {
+	return &SchedulerImpl{
+		jobs: make(map[int64]*AlertJob, 0),
 	}
 	}
 }
 }
 
 
-func (scheduler *Scheduler) dispatch(reader RuleReader) {
-	reschedule := time.NewTicker(time.Second * 10)
-	secondTicker := time.NewTicker(time.Second)
-
-	scheduler.updateJobs(reader.Fetch)
-
-	for {
-		select {
-		case <-secondTicker.C:
-			scheduler.queueJobs()
-		case <-reschedule.C:
-			scheduler.updateJobs(reader.Fetch)
-		}
-	}
-}
-
-func (scheduler *Scheduler) updateJobs(alertRuleFn func() []AlertRule) {
-	log.Debug("Scheduler: UpdateJobs()")
+func (scheduler *SchedulerImpl) Update(rules []*AlertRule) {
+	log.Debug("Scheduler: Update()")
 
 
 	jobs := make(map[int64]*AlertJob, 0)
 	jobs := make(map[int64]*AlertJob, 0)
-	rules := alertRuleFn()
 
 
 	for i, rule := range rules {
 	for i, rule := range rules {
 		var job *AlertJob
 		var job *AlertJob
@@ -65,65 +42,57 @@ func (scheduler *Scheduler) updateJobs(alertRuleFn func() []AlertRule) {
 	scheduler.jobs = jobs
 	scheduler.jobs = jobs
 }
 }
 
 
-func (scheduler *Scheduler) queueJobs() {
-	now := time.Now().Unix()
+func (scheduler *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *AlertJob) {
+	now := tickTime.Unix()
+
 	for _, job := range scheduler.jobs {
 	for _, job := range scheduler.jobs {
 		if now%job.Rule.Frequency == 0 && job.Running == false {
 		if now%job.Rule.Frequency == 0 && job.Running == false {
-			log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title)
-			scheduler.runQueue <- job
-		}
-	}
-}
-
-func (scheduler *Scheduler) executor(executor Executor) {
-	for job := range scheduler.runQueue {
-		//log.Info("Executor: queue length %d", len(this.runQueue))
-		log.Info("Executor: executing %s", job.Rule.Title)
-		job.Running = true
-		scheduler.measureAndExecute(executor, job)
-	}
-}
-
-func (scheduler *Scheduler) handleResponses() {
-	for response := range scheduler.responseQueue {
-		log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount)
-		response.AlertJob.Running = false
-
-		if response.IsResultIncomplete() {
-			response.AlertJob.RetryCount++
-			if response.AlertJob.RetryCount < maxRetries {
-				scheduler.runQueue <- response.AlertJob
-			} else {
-				saveState(&AlertResult{
-					Id:          response.Id,
-					State:       alertstates.Critical,
-					Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
-				})
-			}
-		} else {
-			response.AlertJob.RetryCount = 0
-			saveState(response)
+			log.Trace("Scheduler: Putting job on to exec queue: %s", job.Rule.Title)
+			execQueue <- job
 		}
 		}
 	}
 	}
 }
 }
 
 
-func (scheduler *Scheduler) measureAndExecute(exec Executor, job *AlertJob) {
-	now := time.Now()
-
-	responseChan := make(chan *AlertResult, 1)
-	go exec.Execute(job, responseChan)
-
-	select {
-	case <-time.After(time.Second * 5):
-		scheduler.responseQueue <- &AlertResult{
-			Id:       job.Rule.Id,
-			State:    alertstates.Pending,
-			Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
-			AlertJob: job,
-		}
-	case result := <-responseChan:
-		result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
-		log.Info("Schedular: exeuction took %vms", result.Duration)
-		scheduler.responseQueue <- result
-	}
-}
+// func (scheduler *Scheduler) handleResponses() {
+// 	for response := range scheduler.responseQueue {
+// 		log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount)
+// 		response.AlertJob.Running = false
+//
+// 		if response.IsResultIncomplete() {
+// 			response.AlertJob.RetryCount++
+// 			if response.AlertJob.RetryCount < maxRetries {
+// 				scheduler.runQueue <- response.AlertJob
+// 			} else {
+// 				saveState(&AlertResult{
+// 					Id:          response.Id,
+// 					State:       alertstates.Critical,
+// 					Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
+// 				})
+// 			}
+// 		} else {
+// 			response.AlertJob.RetryCount = 0
+// 			saveState(response)
+// 		}
+// 	}
+// }
+//
+// func (scheduler *Scheduler) measureAndExecute(exec Executor, job *AlertJob) {
+// 	now := time.Now()
+//
+// 	responseChan := make(chan *AlertResult, 1)
+// 	go exec.Execute(job, responseChan)
+//
+// 	select {
+// 	case <-time.After(time.Second * 5):
+// 		scheduler.responseQueue <- &AlertResult{
+// 			Id:       job.Rule.Id,
+// 			State:    alertstates.Pending,
+// 			Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
+// 			AlertJob: job,
+// 		}
+// 	case result := <-responseChan:
+// 		result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
+// 		log.Info("Schedular: exeuction took %vms", result.Duration)
+// 		scheduler.responseQueue <- result
+// 	}
+// }

+ 60 - 0
pkg/services/alerting/ticker.go

@@ -0,0 +1,60 @@
+package alerting
+
+import (
+	"time"
+
+	"github.com/benbjohnson/clock"
+)
+
+// ticker is a ticker to power the alerting scheduler. it's like a time.Ticker, except:
+// * it doesn't drop ticks for slow receivers, rather, it queues up.  so that callers are in control to instrument what's going on.
+// * it automatically ticks every second, which is the right thing in our current design
+// * it ticks on second marks or very shortly after. this provides a predictable load pattern
+//   (this shouldn't cause too much load contention issues because the next steps in the pipeline just process at their own pace)
+// * the timestamps are used to mark "last datapoint to query for" and as such, are a configurable amount of seconds in the past
+// * because we want to allow:
+//   - a clean "resume where we left off" and "don't yield ticks we already did"
+//   - adjusting offset over time to compensate for storage backing up or getting fast and providing lower latency
+//   you specify a lastProcessed timestamp as well as an offset at creation, or runtime
+type Ticker struct {
+	C         chan time.Time
+	clock     clock.Clock
+	last      time.Time
+	offset    time.Duration
+	newOffset chan time.Duration
+}
+
+// NewTicker returns a ticker that ticks on second marks or very shortly after, and never drops ticks
+func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock) *Ticker {
+	t := &Ticker{
+		C:         make(chan time.Time),
+		clock:     c,
+		last:      last,
+		offset:    initialOffset,
+		newOffset: make(chan time.Duration),
+	}
+	go t.run()
+	return t
+}
+
+func (t *Ticker) updateOffset(offset time.Duration) {
+	t.newOffset <- offset
+}
+
+func (t *Ticker) run() {
+	for {
+		next := t.last.Add(time.Duration(1) * time.Second)
+		diff := t.clock.Now().Add(-t.offset).Sub(next)
+		if diff >= 0 {
+			t.C <- next
+			t.last = next
+			continue
+		}
+		// tick is too young. try again when ...
+		select {
+		case <-t.clock.After(-diff): // ...it'll definitely be old enough
+		case offset := <-t.newOffset: // ...it might be old enough
+			t.offset = offset
+		}
+	}
+}

+ 121 - 0
pkg/services/alerting/ticker_test.go

@@ -0,0 +1,121 @@
+package alerting
+
+import (
+	"testing"
+	"time"
+
+	"github.com/benbjohnson/clock"
+)
+
+func inspectTick(tick time.Time, last time.Time, offset time.Duration, t *testing.T) {
+	if !tick.Equal(last.Add(time.Duration(1) * time.Second)) {
+		t.Fatalf("expected a tick 1 second more than prev, %s. got: %s", last, tick)
+	}
+}
+
+// returns the new last tick seen
+func assertAdvanceUntil(ticker *Ticker, last, desiredLast time.Time, offset, wait time.Duration, t *testing.T) time.Time {
+	for {
+		select {
+		case tick := <-ticker.C:
+			inspectTick(tick, last, offset, t)
+			last = tick
+		case <-time.NewTimer(wait).C:
+			if last.Before(desiredLast) {
+				t.Fatalf("waited %s for ticker to advance to %s, but only went up to %s", wait, desiredLast, last)
+			}
+			if last.After(desiredLast) {
+				t.Fatalf("timer advanced too far. should only have gone up to %s, but it went up to %s", desiredLast, last)
+			}
+			return last
+		}
+	}
+}
+
+func assertNoAdvance(ticker *Ticker, desiredLast time.Time, wait time.Duration, t *testing.T) {
+	for {
+		select {
+		case tick := <-ticker.C:
+			t.Fatalf("timer should have stayed at %s, instead it advanced to %s", desiredLast, tick)
+		case <-time.NewTimer(wait).C:
+			return
+		}
+	}
+}
+
+func TestTickerRetro1Hour(t *testing.T) {
+	offset := time.Duration(10) * time.Second
+	last := time.Unix(0, 0)
+	mock := clock.NewMock()
+	mock.Add(time.Duration(1) * time.Hour)
+	desiredLast := mock.Now().Add(-offset)
+	ticker := NewTicker(last, offset, mock)
+
+	last = assertAdvanceUntil(ticker, last, desiredLast, offset, time.Duration(10)*time.Millisecond, t)
+	assertNoAdvance(ticker, last, time.Duration(500)*time.Millisecond, t)
+
+}
+
+func TestAdvanceWithUpdateOffset(t *testing.T) {
+	offset := time.Duration(10) * time.Second
+	last := time.Unix(0, 0)
+	mock := clock.NewMock()
+	mock.Add(time.Duration(1) * time.Hour)
+	desiredLast := mock.Now().Add(-offset)
+	ticker := NewTicker(last, offset, mock)
+
+	last = assertAdvanceUntil(ticker, last, desiredLast, offset, time.Duration(10)*time.Millisecond, t)
+	assertNoAdvance(ticker, last, time.Duration(500)*time.Millisecond, t)
+
+	// lowering offset should see a few more ticks
+	offset = time.Duration(5) * time.Second
+	ticker.updateOffset(offset)
+	desiredLast = mock.Now().Add(-offset)
+	last = assertAdvanceUntil(ticker, last, desiredLast, offset, time.Duration(9)*time.Millisecond, t)
+	assertNoAdvance(ticker, last, time.Duration(500)*time.Millisecond, t)
+
+	// advancing clock should see even more ticks
+	mock.Add(time.Duration(1) * time.Hour)
+	desiredLast = mock.Now().Add(-offset)
+	last = assertAdvanceUntil(ticker, last, desiredLast, offset, time.Duration(8)*time.Millisecond, t)
+	assertNoAdvance(ticker, last, time.Duration(500)*time.Millisecond, t)
+
+}
+
+func getCase(lastSeconds, offsetSeconds int) (time.Time, time.Duration) {
+	last := time.Unix(int64(lastSeconds), 0)
+	offset := time.Duration(offsetSeconds) * time.Second
+	return last, offset
+}
+
+func TestTickerNoAdvance(t *testing.T) {
+
+	// it's 00:01:00 now. what are some cases where we don't want the ticker to advance?
+	mock := clock.NewMock()
+	mock.Add(time.Duration(60) * time.Second)
+
+	type Case struct {
+		last   int
+		offset int
+	}
+
+	// note that some cases add up to now, others go into the future
+	cases := []Case{
+		{50, 10},
+		{50, 30},
+		{59, 1},
+		{59, 10},
+		{59, 30},
+		{60, 1},
+		{60, 10},
+		{60, 30},
+		{90, 1},
+		{90, 10},
+		{90, 30},
+	}
+	for _, c := range cases {
+		last, offset := getCase(c.last, c.offset)
+		ticker := NewTicker(last, offset, mock)
+		assertNoAdvance(ticker, last, time.Duration(500)*time.Millisecond, t)
+	}
+}