Prechádzať zdrojové kódy

feat(alerting): progress on alerting backend

Torkel Ödegaard 9 rokov pred
rodič
commit
0ce55600bb

+ 97 - 8
pkg/services/alerting/conditions.go

@@ -3,25 +3,97 @@ package alerting
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
+	"fmt"
 
 
+	"github.com/grafana/grafana/pkg/bus"
 	"github.com/grafana/grafana/pkg/components/simplejson"
 	"github.com/grafana/grafana/pkg/components/simplejson"
+	m "github.com/grafana/grafana/pkg/models"
+	"github.com/grafana/grafana/pkg/tsdb"
 )
 )
 
 
 type QueryCondition struct {
 type QueryCondition struct {
-	Query     AlertQuery
-	Reducer   QueryReducer
-	Evaluator AlertEvaluator
+	Query         AlertQuery
+	Reducer       QueryReducer
+	Evaluator     AlertEvaluator
+	HandleRequest tsdb.HandleRequestFunc
 }
 }
 
 
 func (c *QueryCondition) Eval(context *AlertResultContext) {
 func (c *QueryCondition) Eval(context *AlertResultContext) {
+	seriesList, err := c.executeQuery(context)
+	if err != nil {
+		context.Error = err
+		return
+	}
+
+	for _, series := range seriesList {
+		reducedValue := c.Reducer.Reduce(series)
+		pass := c.Evaluator.Eval(series, reducedValue)
+		if pass {
+			context.Triggered = true
+			break
+		}
+	}
+}
+
+func (c *QueryCondition) executeQuery(context *AlertResultContext) (tsdb.TimeSeriesSlice, error) {
+	getDsInfo := &m.GetDataSourceByIdQuery{
+		Id:    c.Query.DatasourceId,
+		OrgId: context.Rule.OrgId,
+	}
+
+	if err := bus.Dispatch(getDsInfo); err != nil {
+		return nil, fmt.Errorf("Could not find datasource")
+	}
+
+	req := c.getRequestForAlertRule(getDsInfo.Result)
+	result := make(tsdb.TimeSeriesSlice, 0)
+
+	resp, err := c.HandleRequest(req)
+	if err != nil {
+		return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() error %v", err)
+	}
+
+	for _, v := range resp.Results {
+		if v.Error != nil {
+			return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() response error %v", v)
+		}
+
+		result = append(result, v.Series...)
+	}
+
+	return result, nil
+}
+
+func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource) *tsdb.Request {
+	req := &tsdb.Request{
+		TimeRange: tsdb.TimeRange{
+			From: c.Query.From,
+			To:   c.Query.To,
+		},
+		Queries: []*tsdb.Query{
+			{
+				RefId: "A",
+				Query: c.Query.Model.Get("target").MustString(),
+				DataSource: &tsdb.DataSourceInfo{
+					Id:       datasource.Id,
+					Name:     datasource.Name,
+					PluginId: datasource.Type,
+					Url:      datasource.Url,
+				},
+			},
+		},
+	}
+
+	return req
 }
 }
 
 
 func NewQueryCondition(model *simplejson.Json) (*QueryCondition, error) {
 func NewQueryCondition(model *simplejson.Json) (*QueryCondition, error) {
 	condition := QueryCondition{}
 	condition := QueryCondition{}
+	condition.HandleRequest = tsdb.HandleRequest
 
 
 	queryJson := model.Get("query")
 	queryJson := model.Get("query")
 
 
-	condition.Query.Query = queryJson.Get("query").MustString()
+	condition.Query.Model = queryJson.Get("model")
 	condition.Query.From = queryJson.Get("params").MustArray()[1].(string)
 	condition.Query.From = queryJson.Get("params").MustArray()[1].(string)
 	condition.Query.To = queryJson.Get("params").MustArray()[2].(string)
 	condition.Query.To = queryJson.Get("params").MustArray()[2].(string)
 	condition.Query.DatasourceId = queryJson.Get("datasourceId").MustInt64()
 	condition.Query.DatasourceId = queryJson.Get("datasourceId").MustInt64()
@@ -43,8 +115,18 @@ type SimpleReducer struct {
 	Type string
 	Type string
 }
 }
 
 
-func (s *SimpleReducer) Reduce() float64 {
-	return 0
+func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) float64 {
+	var value float64 = 0
+
+	switch s.Type {
+	case "avg":
+		for _, point := range series.Points {
+			value += point[0]
+		}
+		value = value / float64(len(series.Points))
+	}
+
+	return value
 }
 }
 
 
 func NewSimpleReducer(typ string) *SimpleReducer {
 func NewSimpleReducer(typ string) *SimpleReducer {
@@ -56,8 +138,15 @@ type DefaultAlertEvaluator struct {
 	Threshold float64
 	Threshold float64
 }
 }
 
 
-func (e *DefaultAlertEvaluator) Eval() bool {
-	return true
+func (e *DefaultAlertEvaluator) Eval(series *tsdb.TimeSeries, reducedValue float64) bool {
+	switch e.Type {
+	case ">":
+		return reducedValue > e.Threshold
+	case "<":
+		return reducedValue < e.Threshold
+	}
+
+	return false
 }
 }
 
 
 func NewDefaultAlertEvaluator(model *simplejson.Json) (*DefaultAlertEvaluator, error) {
 func NewDefaultAlertEvaluator(model *simplejson.Json) (*DefaultAlertEvaluator, error) {

+ 25 - 1
pkg/services/alerting/conditions_test.go

@@ -3,7 +3,10 @@ package alerting
 import (
 import (
 	"testing"
 	"testing"
 
 
+	"github.com/grafana/grafana/pkg/bus"
 	"github.com/grafana/grafana/pkg/components/simplejson"
 	"github.com/grafana/grafana/pkg/components/simplejson"
+	m "github.com/grafana/grafana/pkg/models"
+	"github.com/grafana/grafana/pkg/tsdb"
 	. "github.com/smartystreets/goconvey/convey"
 	. "github.com/smartystreets/goconvey/convey"
 )
 )
 
 
@@ -11,6 +14,11 @@ func TestQueryCondition(t *testing.T) {
 
 
 	Convey("when evaluating query condition", t, func() {
 	Convey("when evaluating query condition", t, func() {
 
 
+		bus.AddHandler("test", func(query *m.GetDataSourceByIdQuery) error {
+			query.Result = &m.DataSource{Id: 1, Type: "graphite"}
+			return nil
+		})
+
 		Convey("Given avg() and > 100", func() {
 		Convey("Given avg() and > 100", func() {
 
 
 			jsonModel, err := simplejson.NewJson([]byte(`{
 			jsonModel, err := simplejson.NewJson([]byte(`{
@@ -29,9 +37,25 @@ func TestQueryCondition(t *testing.T) {
 			So(err, ShouldBeNil)
 			So(err, ShouldBeNil)
 
 
 			Convey("Should set result to triggered when avg is above 100", func() {
 			Convey("Should set result to triggered when avg is above 100", func() {
-				context := &AlertResultContext{}
+				context := &AlertResultContext{
+					Rule: &AlertRule{},
+				}
+
+				condition.HandleRequest = func(req *tsdb.Request) (*tsdb.Response, error) {
+					return &tsdb.Response{
+						Results: map[string]*tsdb.QueryResult{
+							"A": &tsdb.QueryResult{
+								Series: tsdb.TimeSeriesSlice{
+									tsdb.NewTimeSeries("test1", [][2]float64{{120, 0}}),
+								},
+							},
+						},
+					}, nil
+				}
+
 				condition.Eval(context)
 				condition.Eval(context)
 
 
+				So(context.Error, ShouldBeNil)
 				So(context.Triggered, ShouldBeTrue)
 				So(context.Triggered, ShouldBeTrue)
 			})
 			})
 		})
 		})

+ 7 - 3
pkg/services/alerting/interfaces.go

@@ -1,6 +1,10 @@
 package alerting
 package alerting
 
 
-import "time"
+import (
+	"time"
+
+	"github.com/grafana/grafana/pkg/tsdb"
+)
 
 
 type AlertHandler interface {
 type AlertHandler interface {
 	Execute(rule *AlertRule, resultChan chan *AlertResultContext)
 	Execute(rule *AlertRule, resultChan chan *AlertResultContext)
@@ -20,9 +24,9 @@ type AlertCondition interface {
 }
 }
 
 
 type QueryReducer interface {
 type QueryReducer interface {
-	Reduce() float64
+	Reduce(timeSeries *tsdb.TimeSeries) float64
 }
 }
 
 
 type AlertEvaluator interface {
 type AlertEvaluator interface {
-	Eval() bool
+	Eval(timeSeries *tsdb.TimeSeries, reducedValue float64) bool
 }
 }

+ 6 - 2
pkg/services/alerting/models.go

@@ -1,6 +1,10 @@
 package alerting
 package alerting
 
 
-import "time"
+import (
+	"time"
+
+	"github.com/grafana/grafana/pkg/components/simplejson"
+)
 
 
 type AlertJob struct {
 type AlertJob struct {
 	Offset     int64
 	Offset     int64
@@ -45,7 +49,7 @@ type Level struct {
 }
 }
 
 
 type AlertQuery struct {
 type AlertQuery struct {
-	Query        string
+	Model        *simplejson.Json
 	DatasourceId int64
 	DatasourceId int64
 	From         string
 	From         string
 	To           string
 	To           string

+ 1 - 3
pkg/tsdb/graphite/graphite.go

@@ -38,9 +38,7 @@ func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryC
 	}
 	}
 
 
 	for _, query := range queries {
 	for _, query := range queries {
-		params["target"] = []string{
-			query.Query,
-		}
+		params["target"] = []string{query.Query}
 	}
 	}
 
 
 	client := http.Client{Timeout: time.Duration(10 * time.Second)}
 	client := http.Client{Timeout: time.Duration(10 * time.Second)}

+ 2 - 0
pkg/tsdb/request.go

@@ -1,5 +1,7 @@
 package tsdb
 package tsdb
 
 
+type HandleRequestFunc func(req *Request) (*Response, error)
+
 func HandleRequest(req *Request) (*Response, error) {
 func HandleRequest(req *Request) (*Response, error) {
 	context := NewQueryContext(req.Queries, req.TimeRange)
 	context := NewQueryContext(req.Queries, req.TimeRange)