Browse Source

feat(influxdb): send request and parse response

bergquist 9 years ago
parent
commit
ab8751767c

+ 135 - 8
pkg/tsdb/influxdb/influxdb.go

@@ -3,9 +3,17 @@ package influxdb
 import (
 	"context"
 	"crypto/tls"
+	"encoding/json"
+	"fmt"
 	"net/http"
+	"net/url"
+	"path"
 	"time"
 
+	"gopkg.in/guregu/null.v3"
+
+	"golang.org/x/net/context/ctxhttp"
+
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/tsdb"
 )
@@ -43,22 +51,141 @@ func init() {
 	}
 }
 
-func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
-	result := &tsdb.BatchResult{}
+func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) {
 	for _, v := range queries {
-
 		query, err := e.QueryParser.Parse(v.Model)
 		if err != nil {
-			result.Error = err
-			return result
+			return "", err
 		}
 
-		glog.Info("Influxdb executor", "query", query)
+		rawQuery, err := e.QueryBuilder.Build(query, context)
+		if err != nil {
+			return "", err
+		}
+
+		return rawQuery, nil
+	}
+
+	return "", fmt.Errorf("Tsdb request contains no queries")
+}
+
+func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
+	result := &tsdb.BatchResult{}
+
+	query, err := e.getQuery(queries, context)
+	if err != nil {
+		result.Error = err
+		return result
+	}
+
+	glog.Info("Influxdb", "query", query)
+
+	u, _ := url.Parse(e.Url)
+	u.Path = path.Join(u.Path, "query")
+
+	req, err := http.NewRequest(http.MethodGet, u.String(), nil)
+	if err != nil {
+		result.Error = err
+		return result
+	}
+
+	params := req.URL.Query()
+	params.Set("q", query)
+	params.Set("db", e.Database)
+	params.Set("epoch", "s")
+
+	req.URL.RawQuery = params.Encode()
 
-		rawQuery, err := e.QueryBuilder.Build(query)
+	req.Header.Set("Content-Type", "")
+	req.Header.Set("User-Agent", "Grafana")
+	if e.BasicAuth {
+		req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
+	}
+
+	glog.Info("influxdb request", "url", req.URL.String())
+	resp, err := ctxhttp.Do(ctx, HttpClient, req)
+	if err != nil {
+		result.Error = err
+		return result
+	}
+
+	if resp.StatusCode/100 != 2 {
+		result.Error = fmt.Errorf("Influxdb returned statuscode %v body %v", resp.Status)
+		return result
+	}
+
+	var response Response
+	dec := json.NewDecoder(resp.Body)
+	dec.UseNumber()
+	err = dec.Decode(&response)
+	if err != nil {
+		glog.Error("Influxdb decode failed", "err", err)
+		result.Error = err
+		return result
+	}
+
+	result.QueryResults = make(map[string]*tsdb.QueryResult)
+	queryRes := tsdb.NewQueryResult()
+
+	for _, v := range response.Results {
+		for _, r := range v.Series {
+			serie := tsdb.TimeSeries{Name: r.Name}
+			var points tsdb.TimeSeriesPoints
+
+			for _, k := range r.Values {
+				var value null.Float
+				var err error
+				num, ok := k[1].(json.Number)
+				if !ok {
+					value = null.FloatFromPtr(nil)
+				} else {
+					fvalue, err := num.Float64()
+					if err == nil {
+						value = null.FloatFrom(fvalue)
+					}
+				}
+
+				pos0, ok := k[0].(json.Number)
+				timestamp, err := pos0.Float64()
+				if err == nil && ok {
+					points = append(points, tsdb.NewTimePoint(value, timestamp))
+				} else {
+					glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64)
+				}
+				serie.Points = points
+			}
+			queryRes.Series = append(queryRes.Series, &serie)
+		}
+	}
 
-		glog.Info("Influxdb", "error", err, "rawQuery", rawQuery)
+	for _, v := range queryRes.Series {
+		glog.Info("result", "name", v.Name, "points", v.Points)
 	}
 
+	result.QueryResults["A"] = queryRes
+
 	return result
 }
+
+type Response struct {
+	Results []Result
+	Err     error
+}
+
+type Result struct {
+	Series   []Row
+	Messages []*Message
+	Err      error
+}
+
+type Message struct {
+	Level string `json:"level,omitempty"`
+	Text  string `json:"text,omitempty"`
+}
+
+type Row struct {
+	Name    string            `json:"name,omitempty"`
+	Tags    map[string]string `json:"tags,omitempty"`
+	Columns []string          `json:"columns,omitempty"`
+	Values  [][]interface{}   `json:"values,omitempty"`
+}

+ 2 - 0
pkg/tsdb/influxdb/models.go

@@ -7,6 +7,8 @@ type Query struct {
 	Tags         []*Tag
 	GroupBy      []*QueryPart
 	Selects      []*Select
+
+	Interval string
 }
 
 type Tag struct {

+ 8 - 3
pkg/tsdb/influxdb/query_builder.go

@@ -3,6 +3,8 @@ package influxdb
 import (
 	"fmt"
 	"strings"
+
+	"github.com/grafana/grafana/pkg/tsdb"
 )
 
 type QueryBuild struct{}
@@ -27,7 +29,7 @@ func renderTags(query *Query) []string {
 	return res
 }
 
-func (*QueryBuild) Build(query *Query) (string, error) {
+func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) {
 	res := "SELECT "
 
 	var selectors []string
@@ -42,7 +44,9 @@ func (*QueryBuild) Build(query *Query) (string, error) {
 	res += strings.Join(selectors, ", ")
 
 	policy := ""
-	if query.Policy != "" {
+	if query.Policy == "" || query.Policy == "default" {
+		policy = ""
+	} else {
 		policy = `"` + query.Policy + `".`
 	}
 	res += fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement)
@@ -54,7 +58,8 @@ func (*QueryBuild) Build(query *Query) (string, error) {
 		res += " AND "
 	}
 
-	res += "$timeFilter"
+	//res += "$timeFilter"
+	res += "time > " + strings.Replace(queryContext.TimeRange.From, "now", "now()", 1)
 
 	var groupBy []string
 	for _, group := range query.GroupBy {

+ 11 - 4
pkg/tsdb/influxdb/query_builder_test.go

@@ -3,6 +3,7 @@ package influxdb
 import (
 	"testing"
 
+	"github.com/grafana/grafana/pkg/tsdb"
 	. "github.com/smartystreets/goconvey/convey"
 )
 
@@ -19,17 +20,22 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
 		tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="}
 		tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"}
 
+		queryContext := &tsdb.QueryContext{
+			TimeRange: tsdb.NewTimeRange("now-5h", "now"),
+		}
+
 		Convey("can build query", func() {
 			query := &Query{
 				Selects:     []*Select{{*qp1, *qp2}},
 				Measurement: "cpu",
 				Policy:      "policy",
 				GroupBy:     []*QueryPart{groupBy1, groupBy2},
+				Interval:    "10s",
 			}
 
-			rawQuery, err := builder.Build(query)
+			rawQuery, err := builder.Build(query, queryContext)
 			So(err, ShouldBeNil)
-			So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE $timeFilter GROUP BY time($interval) fill(null)`)
+			So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now()-5h GROUP BY time(10s) fill(null)`)
 		})
 
 		Convey("can asd query", func() {
@@ -38,11 +44,12 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
 				Measurement: "cpu",
 				GroupBy:     []*QueryPart{groupBy1},
 				Tags:        []*Tag{tag1, tag2},
+				Interval:    "5s",
 			}
 
-			rawQuery, err := builder.Build(query)
+			rawQuery, err := builder.Build(query, queryContext)
 			So(err, ShouldBeNil)
-			So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND $timeFilter GROUP BY time($interval)`)
+			So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now()-5h GROUP BY time(10s)`)
 		})
 	})
 }

+ 6 - 0
pkg/tsdb/influxdb/query_part.go

@@ -91,6 +91,12 @@ func fieldRenderer(part *QueryPart, innerExpr string) string {
 }
 
 func functionRenderer(part *QueryPart, innerExpr string) string {
+	for i, v := range part.Params {
+		if v == "$interval" {
+			part.Params[i] = "10s"
+		}
+	}
+
 	if innerExpr != "" {
 		part.Params = append([]string{innerExpr}, part.Params...)
 	}

+ 8 - 0
pkg/tsdb/influxdb/query_part_test.go

@@ -33,6 +33,14 @@ func TestInfluxdbQueryPart(t *testing.T) {
 			So(res, ShouldEqual, "bottom(value, 3)")
 		})
 
+		Convey("time", func() {
+			part, err := NewQueryPart("time", []string{"$interval"})
+			So(err, ShouldBeNil)
+
+			res := part.Render("")
+			So(res, ShouldEqual, "time(10s)")
+		})
+
 		Convey("should nest spread function", func() {
 			part, err := NewQueryPart("spread", []string{})
 			So(err, ShouldBeNil)

+ 3 - 3
pkg/tsdb/models.go

@@ -73,15 +73,15 @@ func NewQueryResult() *QueryResult {
 	}
 }
 
-func NewTimePoint(value float64, timestamp float64) TimePoint {
-	return TimePoint{null.FloatFrom(value), null.FloatFrom(timestamp)}
+func NewTimePoint(value null.Float, timestamp float64) TimePoint {
+	return TimePoint{value, null.FloatFrom(timestamp)}
 }
 
 func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints {
 	points := make(TimeSeriesPoints, 0)
 
 	for i := 0; i < len(values); i += 2 {
-		points = append(points, NewTimePoint(values[i], values[i+1]))
+		points = append(points, NewTimePoint(null.FloatFrom(values[i]), values[i+1]))
 	}
 
 	return points

+ 3 - 1
pkg/tsdb/prometheus/prometheus.go

@@ -8,6 +8,8 @@ import (
 	"strings"
 	"time"
 
+	"gopkg.in/guregu/null.v3"
+
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/tsdb"
 	"github.com/prometheus/client_golang/api/prometheus"
@@ -145,7 +147,7 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
 		}
 
 		for _, k := range v.Values {
-			series.Points = append(series.Points, tsdb.NewTimePoint(float64(k.Value), float64(k.Timestamp.Unix()*1000)))
+			series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000)))
 		}
 
 		queryRes.Series = append(queryRes.Series, &series)

+ 5 - 3
pkg/tsdb/testdata/scenarios.go

@@ -6,6 +6,8 @@ import (
 	"strings"
 	"time"
 
+	"gopkg.in/guregu/null.v3"
+
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/tsdb"
 )
@@ -42,7 +44,7 @@ func init() {
 			walker := rand.Float64() * 100
 
 			for i := int64(0); i < 10000 && timeWalkerMs < to; i++ {
-				points = append(points, tsdb.NewTimePoint(walker, float64(timeWalkerMs)))
+				points = append(points, tsdb.NewTimePoint(null.FloatFrom(walker), float64(timeWalkerMs)))
 
 				walker += rand.Float64() - 0.5
 				timeWalkerMs += query.IntervalMs
@@ -73,7 +75,7 @@ func init() {
 			series := newSeriesForQuery(query)
 			outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000
 
-			series.Points = append(series.Points, tsdb.NewTimePoint(10, float64(outsideTime)))
+			series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(10), float64(outsideTime)))
 			queryRes.Series = append(queryRes.Series, series)
 
 			return queryRes
@@ -105,7 +107,7 @@ func init() {
 			step := (endTime - startTime) / int64(len(values)-1)
 
 			for _, val := range values {
-				series.Points = append(series.Points, tsdb.NewTimePoint(val, float64(startTime)))
+				series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(val), float64(startTime)))
 				startTime += step
 			}