Browse Source

feat(mqe): add response parser

bergquist 9 năm trước cách đây
mục cha
commit
f1897b7e96
3 tập tin đã thay đổi với 121 bổ sung32 xóa
  1. 34 6
      pkg/tsdb/mqe/mqe.go
  2. 83 2
      pkg/tsdb/mqe/response_parser.go
  3. 4 24
      pkg/tsdb/mqe/response_parser_test.go

+ 34 - 6
pkg/tsdb/mqe/mqe.go

@@ -10,10 +10,14 @@ import (
 
 type MQEExecutor struct {
 	*tsdb.DataSourceInfo
+	QueryParser *MQEQueryParser
 }
 
 func NewMQEExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
-	return &MQEExecutor{dsInfo}
+	return &MQEExecutor{
+		DataSourceInfo: dsInfo,
+		QueryParser:    &MQEQueryParser{},
+	}
 }
 
 var (
@@ -29,16 +33,40 @@ func init() {
 }
 
 func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
+	result := &tsdb.BatchResult{}
+
+	availableSeries, err := NewTokenClient().GetTokenData(ctx, e.DataSourceInfo)
+	if err != nil {
+		return result.WithError(err)
+	}
 
-	availableSeries, _ := NewTokenClient().GetTokenData(ctx, e.DataSourceInfo)
 	glog.Info("available series", availableSeries)
 
-  //query, _ := &MQEQueryParser{}.Parse()
+	var mqeQueries []*MQEQuery
+	for _, v := range queries {
+		q, err := e.QueryParser.Parse(v.Model, e.DataSourceInfo)
+		if err != nil {
+			return result.WithError(err)
+		}
+		mqeQueries = append(mqeQueries, q)
+	}
 
+	var rawQueries []string
+	for _, v := range mqeQueries {
+		queries, err := v.Build(availableSeries.Metrics)
+		if err != nil {
+			return result.WithError(err)
+		}
 
+		rawQueries = append(rawQueries, queries...)
+	}
 
-  //fetch all available serienames
-	//expaned parsed model into multiple queries
+	for _, v := range rawQueries {
+		glog.Info("Mqe executor", "query", v)
+		//create request from v
+		//send request
+		//parse request
+	}
 
-	return &tsdb.BatchResult{}
+	return result
 }

+ 83 - 2
pkg/tsdb/mqe/response_parser.go

@@ -1,8 +1,16 @@
 package mqe
 
 import (
+	"encoding/json"
+	"io/ioutil"
 	"net/http"
+	"time"
 
+	null "gopkg.in/guregu/null.v3"
+
+	"fmt"
+
+	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/tsdb"
 )
 
@@ -11,8 +19,81 @@ import (
 // add app to alias
 // regular alias
 
-type MQEResponseParser struct{}
+func NewResponseParser() *MQEResponseParser {
+	return &MQEResponseParser{
+		log: log.New("tsdb.mqe"),
+	}
+}
+
+type MQEResponse struct {
+	Success bool               `json:"success"`
+	Name    string             `json:"name"`
+	Body    []MQEResponseSerie `json:"body"`
+}
+
+type ResponseTimeRange struct {
+	Start      int64         `json:"start"`
+	End        int64         `json:"end"`
+	Resolution time.Duration `json:"Resolution"`
+}
+
+type MQEResponseSerie struct {
+	Query     string            `json:"query"`
+	Name      string            `json:"name"`
+	Type      string            `json:"type"`
+	Series    []MQESerie        `json:"series"`
+	TimeRange ResponseTimeRange `json:"timerange"`
+}
+
+type MQESerie struct {
+	Values []null.Float      `json:"values"`
+	Tagset map[string]string `json:"tagset"`
+}
+
+type MQEResponseParser struct {
+	log log.Logger
+}
 
 func (parser *MQEResponseParser) Parse(res *http.Response) (*tsdb.QueryResult, error) {
-	return nil, nil
+	body, err := ioutil.ReadAll(res.Body)
+	defer res.Body.Close()
+	if err != nil {
+		return nil, err
+	}
+
+	if res.StatusCode/100 != 2 {
+		parser.log.Error("Request failed", "status code", res.StatusCode, "body", string(body))
+		return nil, fmt.Errorf("Returned invalid statuscode")
+	}
+
+	var data *MQEResponse = &MQEResponse{}
+	err = json.Unmarshal(body, data)
+	if err != nil {
+		parser.log.Info("Failed to unmarshal graphite response", "error", err, "status", res.Status, "body", string(body))
+		return nil, err
+	}
+
+	if !data.Success {
+		return nil, fmt.Errorf("MQE request failed.")
+	}
+
+	var series tsdb.TimeSeriesSlice
+	for _, v := range data.Body {
+		for _, k := range v.Series {
+			serie := &tsdb.TimeSeries{
+				Name: v.Name,
+			}
+
+			startTime := time.Unix(v.TimeRange.Start*1000, 0)
+			for i, l := range k.Values {
+				timestamp := startTime.Add(time.Duration(int64(v.TimeRange.Resolution) * int64(i)))
+				serie.Points = append(serie.Points, tsdb.NewTimePoint(l, float64(timestamp.UnixNano()/int64(time.Millisecond))))
+			}
+
+			series = append(series, serie)
+		}
+
+	}
+
+	return &tsdb.QueryResult{Series: series}, nil
 }

+ 4 - 24
pkg/tsdb/mqe/response_parser_test.go

@@ -3,14 +3,11 @@ package mqe
 import (
 	"testing"
 
-	"time"
-
 	"net/http"
 	"strings"
 
 	"io/ioutil"
 
-	"github.com/grafana/grafana/pkg/components/simplejson"
 	. "github.com/smartystreets/goconvey/convey"
 )
 
@@ -20,38 +17,21 @@ var (
 
 func TestMQEResponseParser(t *testing.T) {
 	Convey("MQE response parser", t, func() {
-		parser := &MQEResponseParser{}
+		parser := NewResponseParser()
 
 		Convey("Can parse response", func() {
 			response := &http.Response{
 				StatusCode: 200,
 				Body:       ioutil.NopCloser(strings.NewReader(dummieJson)),
 			}
-			_, err := parser.Parse(response)
+			res, err := parser.Parse(response)
 			So(err, ShouldBeNil)
+			So(len(res.Series), ShouldEqual, 2)
+			So(len(res.Series[0].Points), ShouldEqual, 11)
 		})
 	})
 }
 
-type MQEResponse struct {
-	Success bool               `json:"success"`
-	Name    string             `json:"name"`
-	Body    []MQEResponseSerie `json:"body"`
-}
-
-type ResponseTimeRange struct {
-	Start      time.Time     `json:"start"`
-	End        time.Time     `json:"end"`
-	Resolution time.Duration `json:"Resolution"`
-}
-
-type MQEResponseSerie struct {
-	Query  string            `json:"query"`
-	Name   string            `json:"name"`
-	Type   string            `json:"type"`
-	Series []simplejson.Json `json:"series"`
-}
-
 func init() {
 	dummieJson = `{
     "success": true,