Просмотр исходного кода

feat(alerting): naiv graphite executor

bergquist 9 лет назад
Родитель
Сommit
957cb407c5

+ 13 - 1
pkg/models/alerts_state.go

@@ -14,7 +14,19 @@ type AlertState struct {
 }
 
 var (
-	VALID_STATES = []string{"OK", "WARN", "CRITICAL", "ACKNOWLEDGED"}
+	VALID_STATES = []string{
+		ALERT_STATE_OK,
+		ALERT_STATE_WARN,
+		ALERT_STATE_CRITICAL,
+		ALERT_STATE_ACKNOWLEDGED,
+		ALERT_STATE_MAINTENANCE,
+	}
+
+	ALERT_STATE_OK           = "OK"
+	ALERT_STATE_WARN         = "WARN"
+	ALERT_STATE_CRITICAL     = "CRITICAL"
+	ALERT_STATE_ACKNOWLEDGED = "ACKNOWLEDGED"
+	ALERT_STATE_MAINTENANCE  = "MAINTENANCE"
 )
 
 func (this *UpdateAlertStateCommand) IsValidState() bool {

+ 13 - 1
pkg/services/alerting/alert_rule_reader.go

@@ -17,6 +17,18 @@ func (this AlertRuleReader) Fetch() []m.AlertRule {
 		//{Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
 		//{Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
 		//{Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
-		{Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1},
+		{
+			Id:           6,
+			OrgId:        1,
+			Title:        "alert rule 6",
+			Interval:     "10s",
+			Frequency:    3,
+			DatasourceId: 1,
+			WarnOperator: ">",
+			WarnLevel:    100,
+			Aggregator:   "avg",
+			Query:        `{"refId":"A","target":"statsd.fakesite.counters.session_start.*.count","textEditor":true}"`,
+			QueryRange:   "1h",
+		},
 	}
 }

+ 11 - 10
pkg/services/alerting/alerting.go

@@ -21,7 +21,7 @@ func Init() {
 
 	scheduler := NewScheduler()
 	go scheduler.Dispatch(&AlertRuleReader{})
-	go scheduler.Executor(&DummieExecutor{})
+	go scheduler.Executor(&GraphiteExecutor{})
 	go scheduler.HandleResponses()
 }
 
@@ -128,9 +128,9 @@ func (this *Scheduler) Executor(executor Executor) {
 
 func (this *Scheduler) HandleResponses() {
 	for response := range this.responseQueue {
-		log.Info("Response: alert %d returned %s", response.id, response.state)
-		if this.jobs[response.id] != nil {
-			this.jobs[response.id].running = false
+		log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue)
+		if this.jobs[response.Id] != nil {
+			this.jobs[response.Id].running = false
 		}
 	}
 }
@@ -143,10 +143,10 @@ func (this *Scheduler) MeasureAndExecute(exec Executor, rule *AlertJob) {
 
 	select {
 	case <-time.After(time.Second * 5):
-		this.responseQueue <- &AlertResult{id: rule.rule.Id, state: "timed out", duration: time.Since(now).Nanoseconds() / 1000000}
+		this.responseQueue <- &AlertResult{Id: rule.rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
 	case r := <-response:
-		r.duration = time.Since(now).Nanoseconds() / 1000000
-		log.Info("Schedular: exeuction took %v milli seconds", r.duration)
+		r.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
+		log.Info("Schedular: exeuction took %vms", r.Duration)
 		this.responseQueue <- r
 	}
 }
@@ -159,7 +159,8 @@ type AlertJob struct {
 }
 
 type AlertResult struct {
-	id       int64
-	state    string
-	duration int64
+	Id          int64
+	State       string
+	ActualValue float64
+	Duration    float64
 }

+ 2 - 2
pkg/services/alerting/executor.go → pkg/services/alerting/dummie_executor.go

@@ -12,12 +12,12 @@ type Executor interface {
 
 type DummieExecutor struct{}
 
-func (this DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
+func (this *DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
 	if rule.Id == 6 {
 		time.Sleep(time.Second * 0)
 	}
 	//time.Sleep(time.Second)
 	log.Info("Finnished executing: %d", rule.Id)
 
-	responseQueue <- &AlertResult{state: "OK", id: rule.Id}
+	responseQueue <- &AlertResult{State: "OK", Id: rule.Id}
 }

+ 107 - 0
pkg/services/alerting/graphite_executor.go

@@ -0,0 +1,107 @@
+package alerting
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/franela/goreq"
+	"github.com/grafana/grafana/pkg/bus"
+	"github.com/grafana/grafana/pkg/components/simplejson"
+	m "github.com/grafana/grafana/pkg/models"
+	"net/http"
+	"net/url"
+	"time"
+)
+
+type GraphiteExecutor struct{}
+
+type Series struct {
+	Datapoints []DataPoint
+	Target     string
+}
+
+type Response []Series
+type DataPoint []json.Number
+
+func (this *GraphiteExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
+	response, err := this.getSeries(rule)
+
+	if err != nil {
+		responseQueue <- &AlertResult{State: "CRITICAL", Id: rule.Id}
+	}
+
+	responseQueue <- this.executeRules(response, rule)
+}
+
+func (this *GraphiteExecutor) executeRules(series []Series, rule m.AlertRule) *AlertResult {
+	for _, v := range series {
+		var avg float64
+		var sum float64
+		for _, dp := range v.Datapoints {
+			i, _ := dp[0].Float64()
+			sum += i
+		}
+
+		avg = sum / float64(len(v.Datapoints))
+
+		if float64(rule.CritLevel) < avg {
+			return &AlertResult{State: m.ALERT_STATE_CRITICAL, Id: rule.Id, ActualValue: avg}
+		}
+
+		if float64(rule.WarnLevel) < avg {
+			return &AlertResult{State: m.ALERT_STATE_WARN, Id: rule.Id, ActualValue: avg}
+		}
+
+		if float64(rule.CritLevel) < sum {
+			return &AlertResult{State: m.ALERT_STATE_CRITICAL, Id: rule.Id, ActualValue: sum}
+		}
+
+		if float64(rule.WarnLevel) < sum {
+			return &AlertResult{State: m.ALERT_STATE_WARN, Id: rule.Id, ActualValue: sum}
+		}
+	}
+
+	return &AlertResult{State: m.ALERT_STATE_OK, Id: rule.Id}
+}
+
+func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (Response, error) {
+	query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId}
+	if err := bus.Dispatch(query); err != nil {
+		return nil, err
+	}
+
+	v := url.Values{
+		"format": []string{"json"},
+		"target": []string{getTargetFromQuery(rule)},
+	}
+
+	v.Add("from", "-"+rule.QueryRange)
+	v.Add("until", "now")
+
+	req := goreq.Request{
+		Method:  "POST",
+		Uri:     query.Result.Url + "/render",
+		Body:    v.Encode(),
+		Timeout: 500 * time.Millisecond,
+	}
+
+	res, err := req.Do()
+
+	response := Response{}
+	res.Body.FromJsonTo(&response)
+
+	if err != nil {
+		return nil, err
+	}
+
+	if res.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("error!")
+	}
+
+	return response, nil
+}
+
+func getTargetFromQuery(rule m.AlertRule) string {
+	json, _ := simplejson.NewJson([]byte(rule.Query))
+
+	return json.Get("target").MustString()
+}

+ 0 - 1
pkg/services/sqlstore/dashboard_parser_test.go

@@ -406,7 +406,6 @@ func TestAlertModel(t *testing.T) {
 
 			So(alerts[0].DatasourceId, ShouldEqual, 2)
 			So(alerts[1].DatasourceId, ShouldEqual, 1)
-
 		})
 	})
 }

+ 1 - 1
pkg/services/sqlstore/migrator/migrator.go

@@ -115,7 +115,7 @@ func (mg *Migrator) Start() error {
 
 func (mg *Migrator) exec(m Migration) error {
 	if mg.LogLevel <= log.INFO {
-		//log.Info("Migrator: exec migration id: %v", m.Id())
+		log.Info("Migrator: exec migration id: %v", m.Id())
 	}
 
 	err := mg.inTransaction(func(sess *xorm.Session) error {