Forráskód Böngészése

feat(alerting): minor progress

Torkel Ödegaard 9 éve
szülő
commit
d1acfb4494

+ 4 - 2
pkg/services/alerting/engine.go

@@ -35,7 +35,7 @@ func NewEngine() *Engine {
 func (e *Engine) Start() {
 	log.Info("Alerting: Engine.Start()")
 
-	go e.schedulerTick()
+	go e.alertingTicker()
 	go e.execDispatch()
 	go e.resultHandler()
 }
@@ -45,7 +45,7 @@ func (e *Engine) Stop() {
 	close(e.resultQueue)
 }
 
-func (e *Engine) schedulerTick() {
+func (e *Engine) alertingTicker() {
 	tickIndex := 0
 
 	for {
@@ -57,6 +57,8 @@ func (e *Engine) schedulerTick() {
 			}
 
 			e.scheduler.Tick(tick, e.execQueue)
+
+			tickIndex++
 		}
 	}
 }

+ 6 - 6
pkg/services/alerting/executor.go

@@ -13,7 +13,7 @@ import (
 )
 
 var (
-	resultLogFmt   = "%s executor: %s  %1.2f %s %1.2f : %v"
+	resultLogFmt   = "Alerting: executor %s  %1.2f %s %1.2f : %v"
 	descriptionFmt = "Actual value: %1.2f for %s"
 )
 
@@ -77,19 +77,19 @@ var aggregator = map[string]aggregationFn{
 	},
 }
 
-func (executor *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
-	response, err := executor.GetSeries(job)
+func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
+	response, err := e.GetSeries(job)
 
 	if err != nil {
 		resultQueue <- &AlertResult{State: alertstates.Pending, Id: job.Rule.Id, AlertJob: job}
 	}
 
-	result := executor.validateRule(job.Rule, response)
+	result := e.validateRule(job.Rule, response)
 	result.AlertJob = job
 	resultQueue <- result
 }
 
-func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, error) {
+func (e *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, error) {
 	query := &m.GetDataSourceByIdQuery{
 		Id:    job.Rule.DatasourceId,
 		OrgId: job.Rule.OrgId,
@@ -108,7 +108,7 @@ func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, er
 	return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type)
 }
 
-func (executor *ExecutorImpl) validateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
+func (e *ExecutorImpl) validateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
 	for _, serie := range series {
 		if aggregator[rule.Aggregator] == nil {
 			continue

+ 1 - 1
pkg/services/alerting/interfaces.go

@@ -8,5 +8,5 @@ type Executor interface {
 
 type Scheduler interface {
 	Tick(time time.Time, execQueue chan *AlertJob)
-	Update(rules []AlertRule)
+	Update(rules []*AlertRule)
 }

+ 91 - 0
pkg/services/alerting/rule_reader.go

@@ -0,0 +1,91 @@
+package alerting
+
+import (
+	"sync"
+	"time"
+
+	"github.com/grafana/grafana/pkg/bus"
+	"github.com/grafana/grafana/pkg/log"
+	m "github.com/grafana/grafana/pkg/models"
+)
+
+type RuleReader interface {
+	Fetch() []*AlertRule
+}
+
+type AlertRuleReader struct {
+	sync.RWMutex
+	serverID       string
+	serverPosition int
+	clusterSize    int
+}
+
+func NewRuleReader() *AlertRuleReader {
+	ruleReader := &AlertRuleReader{}
+
+	go ruleReader.initReader()
+	return ruleReader
+}
+
+func (arr *AlertRuleReader) initReader() {
+	heartbeat := time.NewTicker(time.Second * 10)
+
+	for {
+		select {
+		case <-heartbeat.C:
+			arr.heartbeat()
+		}
+	}
+}
+
+func (arr *AlertRuleReader) Fetch() []*AlertRule {
+	cmd := &m.GetAllAlertsQuery{}
+	err := bus.Dispatch(cmd)
+
+	if err != nil {
+		log.Error(1, "Alerting: ruleReader.fetch(): Could not load alerts", err)
+		return []*AlertRule{}
+	}
+
+	res := make([]*AlertRule, len(cmd.Result))
+	for i, ruleDef := range cmd.Result {
+		model := &AlertRule{}
+		model.Id = ruleDef.Id
+		model.OrgId = ruleDef.OrgId
+		model.DatasourceId = ruleDef.DatasourceId
+		model.Query = ruleDef.Query
+		model.QueryRefId = ruleDef.QueryRefId
+		model.WarnLevel = ruleDef.WarnLevel
+		model.WarnOperator = ruleDef.WarnOperator
+		model.CritLevel = ruleDef.CritLevel
+		model.CritOperator = ruleDef.CritOperator
+		model.Frequency = ruleDef.Frequency
+		model.Title = ruleDef.Title
+		model.Description = ruleDef.Description
+		model.Aggregator = ruleDef.Aggregator
+		model.State = ruleDef.State
+		res[i] = model
+	}
+
+	return res
+}
+
+func (arr *AlertRuleReader) heartbeat() {
+
+	//Lets cheat on this until we focus on clustering
+	//log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
+	arr.clusterSize = 1
+	arr.serverPosition = 1
+
+	/*
+		cmd := &m.HeartBeatCommand{ServerId: this.serverId}
+		err := bus.Dispatch(cmd)
+
+		if err != nil {
+			log.Error(1, "Failed to send heartbeat.")
+		} else {
+			this.clusterSize = cmd.Result.ClusterSize
+			this.serverPosition = cmd.Result.UptimePosition
+		}
+	*/
+}

+ 2 - 4
pkg/services/alerting/scheduler.go

@@ -16,7 +16,7 @@ func NewScheduler() Scheduler {
 	}
 }
 
-func (s *SchedulerImpl) Update(rules []AlertRule) {
+func (s *SchedulerImpl) Update(rules []*AlertRule) {
 	log.Debug("Scheduler: Update()")
 
 	jobs := make(map[int64]*AlertJob, 0)
@@ -32,7 +32,7 @@ func (s *SchedulerImpl) Update(rules []AlertRule) {
 			}
 		}
 
-		job.Rule = &rule
+		job.Rule = rule
 		job.Offset = int64(i)
 
 		jobs[rule.Id] = job
@@ -45,8 +45,6 @@ func (s *SchedulerImpl) Update(rules []AlertRule) {
 func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *AlertJob) {
 	now := tickTime.Unix()
 
-	log.Info("Alerting: Scheduler.Tick() %v", len(s.jobs))
-
 	for _, job := range s.jobs {
 		if now%job.Rule.Frequency == 0 && job.Running == false {
 			log.Trace("Scheduler: Putting job on to exec queue: %s", job.Rule.Title)