فهرست منبع

chore(alerting): move alert result to models

bergquist 9 سال پیش
والد
کامیت
b75631e021
4فایلهای تغییر یافته به همراه25 افزوده شده و 23 حذف شده
  1. 7 0
      pkg/models/alerts.go
  2. 5 11
      pkg/services/alerting/alerting.go
  3. 6 5
      pkg/services/alerting/dummie_executor.go
  4. 7 7
      pkg/services/alerting/executor.go

+ 7 - 0
pkg/models/alerts.go

@@ -116,3 +116,10 @@ type AlertJob struct {
 	Rule       AlertRule
 	Datasource DataSource
 }
+
+type AlertResult struct {
+	Id          int64
+	State       string
+	ActualValue float64
+	Duration    float64
+}

+ 5 - 11
pkg/services/alerting/alerting.go

@@ -28,7 +28,7 @@ func Init() {
 type Scheduler struct {
 	jobs          map[int64]*m.AlertJob
 	runQueue      chan *m.AlertJob
-	responseQueue chan *AlertResult
+	responseQueue chan *m.AlertResult
 
 	alertRuleFetcher RuleReader
 }
@@ -37,7 +37,7 @@ func NewScheduler() *Scheduler {
 	return &Scheduler{
 		jobs:          make(map[int64]*m.AlertJob, 0),
 		runQueue:      make(chan *m.AlertJob, 1000),
-		responseQueue: make(chan *AlertResult, 1000),
+		responseQueue: make(chan *m.AlertResult, 1000),
 	}
 }
 
@@ -66,6 +66,7 @@ func (this *Scheduler) updateJobs(f func() []m.AlertJob) {
 	for i := 0; i < len(rules); i++ {
 		rule := rules[i]
 		//jobs[rule.Rule.Id] = &m.AlertJob{Rule: rule, Offset: int64(len(jobs))}
+		rule.Offset = int64(len(jobs))
 		jobs[rule.Rule.Id] = &rule
 	}
 
@@ -115,22 +116,15 @@ func (this *Scheduler) HandleResponses() {
 func (this *Scheduler) MeasureAndExecute(exec Executor, rule *m.AlertJob) {
 	now := time.Now()
 
-	response := make(chan *AlertResult, 1)
+	response := make(chan *m.AlertResult, 1)
 	go exec.Execute(rule, response)
 
 	select {
 	case <-time.After(time.Second * 5):
-		this.responseQueue <- &AlertResult{Id: rule.Rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
+		this.responseQueue <- &m.AlertResult{Id: rule.Rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
 	case r := <-response:
 		r.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
 		log.Info("Schedular: exeuction took %vms", r.Duration)
 		this.responseQueue <- r
 	}
 }
-
-type AlertResult struct {
-	Id          int64
-	State       string
-	ActualValue float64
-	Duration    float64
-}

+ 6 - 5
pkg/services/alerting/dummie_executor.go

@@ -8,12 +8,13 @@ import (
 
 type DummieExecutor struct{}
 
-func (this *DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
-	if rule.Id == 6 {
-		time.Sleep(time.Second * 0)
+func (this *DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *m.AlertResult) {
+	if rule.Id%3 == 0 {
+		time.Sleep(time.Second * 1)
 	}
-	//time.Sleep(time.Second)
+
+	time.Sleep(time.Second)
 	log.Info("Finnished executing: %d", rule.Id)
 
-	responseQueue <- &AlertResult{State: "OK", Id: rule.Id}
+	responseQueue <- &m.AlertResult{State: "OK", Id: rule.Id}
 }

+ 7 - 7
pkg/services/alerting/executor.go

@@ -6,7 +6,7 @@ import (
 )
 
 type Executor interface {
-	Execute(rule *m.AlertJob, responseQueue chan *AlertResult)
+	Execute(rule *m.AlertJob, responseQueue chan *m.AlertResult)
 }
 
 type ExecutorImpl struct{}
@@ -30,17 +30,17 @@ var aggregator map[string]aggregationFn = map[string]aggregationFn{
 	"mean": func(series *m.TimeSeries) float64 { return series.Mean },
 }
 
-func (this *ExecutorImpl) Execute(rule *m.AlertJob, responseQueue chan *AlertResult) {
+func (this *ExecutorImpl) Execute(rule *m.AlertJob, responseQueue chan *m.AlertResult) {
 	response, err := graphite.GraphiteClient{}.GetSeries(rule)
 
 	if err != nil {
-		responseQueue <- &AlertResult{State: "PENDING", Id: rule.Rule.Id}
+		responseQueue <- &m.AlertResult{State: "PENDING", Id: rule.Rule.Id}
 	}
 
 	responseQueue <- this.ValidateRule(rule.Rule, response)
 }
 
-func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *AlertResult {
+func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult {
 	for _, serie := range series {
 		if aggregator[rule.Aggregator] == nil {
 			continue
@@ -49,13 +49,13 @@ func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlic
 		var aggValue = aggregator[rule.Aggregator](serie)
 
 		if operators[rule.CritOperator](aggValue, float64(rule.CritLevel)) {
-			return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: aggValue}
+			return &m.AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: aggValue}
 		}
 
 		if operators[rule.WarnOperator](aggValue, float64(rule.WarnLevel)) {
-			return &AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: aggValue}
+			return &m.AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: aggValue}
 		}
 	}
 
-	return &AlertResult{State: m.AlertStateOk, Id: rule.Id}
+	return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id}
 }