浏览代码

wip

Signed-off-by: wph95 <wph657856467@gmail.com>
wph95 7 年之前
父节点
当前提交
8e7d23cdeb

+ 1 - 0
pkg/cmd/grafana-server/main.go

@@ -21,6 +21,7 @@ import (
 	_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
 	_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
 	_ "github.com/grafana/grafana/pkg/tsdb/cloudwatch"
+	_ "github.com/grafana/grafana/pkg/tsdb/elasticsearch"
 	_ "github.com/grafana/grafana/pkg/tsdb/graphite"
 	_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
 	_ "github.com/grafana/grafana/pkg/tsdb/mysql"

+ 131 - 0
pkg/tsdb/elasticsearch/elasticsearch.go

@@ -0,0 +1,131 @@
+package elasticsearch
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/davecgh/go-spew/spew"
+	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/models"
+	"github.com/grafana/grafana/pkg/setting"
+	"github.com/grafana/grafana/pkg/tsdb"
+	"golang.org/x/net/context/ctxhttp"
+	"net/http"
+	"net/url"
+	"path"
+	"strings"
+	"time"
+)
+
+type ElasticsearchExecutor struct {
+	Transport *http.Transport
+}
+
+var (
+	glog               log.Logger
+	intervalCalculator tsdb.IntervalCalculator
+)
+
+func NewElasticsearchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
+	transport, err := dsInfo.GetHttpTransport()
+	if err != nil {
+		return nil, err
+	}
+
+	return &ElasticsearchExecutor{
+		Transport: transport,
+	}, nil
+}
+
+func init() {
+	glog = log.New("tsdb.elasticsearch")
+	tsdb.RegisterTsdbQueryEndpoint("elasticsearch", NewElasticsearchExecutor)
+	intervalCalculator = tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: time.Millisecond * 1})
+}
+
+func (e *ElasticsearchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
+	result := &tsdb.Response{}
+	result.Results = make(map[string]*tsdb.QueryResult)
+
+	queryParser := ElasticSearchQueryParser{
+		dsInfo,
+		tsdbQuery.TimeRange,
+		tsdbQuery.Queries,
+		glog,
+	}
+
+	glog.Warn(spew.Sdump(dsInfo))
+	glog.Warn(spew.Sdump(tsdbQuery))
+
+	payload, err := queryParser.Parse()
+	if err != nil {
+		return nil, err
+	}
+
+	if setting.Env == setting.DEV {
+		glog.Debug("Elasticsearch playload", "raw playload", payload)
+	}
+	glog.Info("Elasticsearch playload", "raw playload", payload)
+
+	req, err := e.createRequest(dsInfo, payload)
+	if err != nil {
+		return nil, err
+	}
+
+	httpClient, err := dsInfo.GetHttpClient()
+	if err != nil {
+		return nil, err
+	}
+
+	resp, err := ctxhttp.Do(ctx, httpClient, req)
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode/100 != 2 {
+		return nil, fmt.Errorf("elasticsearch returned statuscode invalid status code: %v", resp.Status)
+	}
+
+	var responses Responses
+	dec := json.NewDecoder(resp.Body)
+	defer resp.Body.Close()
+	dec.UseNumber()
+	err = dec.Decode(&responses)
+	if err != nil {
+		return nil, err
+	}
+
+	glog.Warn(spew.Sdump(responses))
+	for _, res := range responses.Responses {
+		if res.Err != nil {
+			return nil, errors.New(res.getErrMsg())
+		}
+
+	}
+
+	return result, nil
+}
+
+func (e *ElasticsearchExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
+	u, _ := url.Parse(dsInfo.Url)
+	u.Path = path.Join(u.Path, "_msearch")
+	req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(query))
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("User-Agent", "Grafana")
+	req.Header.Set("Content-Type", "application/json")
+
+	if dsInfo.BasicAuth {
+		req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.BasicAuthPassword)
+	}
+
+	if !dsInfo.BasicAuth && dsInfo.User != "" {
+		req.SetBasicAuth(dsInfo.User, dsInfo.Password)
+	}
+
+	glog.Debug("Elasticsearch request", "url", req.URL.String())
+	glog.Debug("Elasticsearch request", "body", query)
+	return req, nil
+}

+ 97 - 0
pkg/tsdb/elasticsearch/model_parser.go

@@ -0,0 +1,97 @@
+package elasticsearch
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"github.com/grafana/grafana/pkg/components/simplejson"
+	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/models"
+	"github.com/grafana/grafana/pkg/tsdb"
+	"src/github.com/davecgh/go-spew/spew"
+	"strconv"
+	"strings"
+	"time"
+)
+
+type ElasticSearchQueryParser struct {
+	DsInfo    *models.DataSource
+	TimeRange *tsdb.TimeRange
+	Queries   []*tsdb.Query
+	glog      log.Logger
+}
+
+func (qp *ElasticSearchQueryParser) Parse() (string, error) {
+	payload := bytes.Buffer{}
+	queryHeader := qp.getQueryHeader()
+
+	for _, q := range qp.Queries {
+		timeField, err := q.Model.Get("timeField").String()
+		if err != nil {
+			return "", err
+		}
+		rawQuery := q.Model.Get("query").MustString("")
+		bucketAggs := q.Model.Get("bucketAggs").MustArray()
+		metrics := q.Model.Get("metrics").MustArray()
+		alias := q.Model.Get("alias").MustString("")
+		builder := QueryBuilder{timeField, rawQuery, bucketAggs, metrics, alias}
+
+		query, err := builder.Build()
+		if err != nil {
+			return "", err
+		}
+		queryBytes, err := json.Marshal(query)
+		if err != nil {
+			return "", err
+		}
+
+		payload.WriteString(queryHeader.String() + "\n")
+		payload.WriteString(string(queryBytes) + "\n")
+	}
+
+	return qp.payloadReplace(payload.String(), qp.DsInfo.JsonData)
+
+}
+
+func (qp *ElasticSearchQueryParser) getQueryHeader() *QueryHeader {
+	var header QueryHeader
+	esVersion := qp.DsInfo.JsonData.Get("esVersion").MustInt()
+
+	searchType := "query_then_fetch"
+	if esVersion < 5 {
+		searchType = "count"
+	}
+	header.SearchType = searchType
+	header.IgnoreUnavailable = true
+	header.Index = qp.getIndexList()
+
+	if esVersion >= 56 {
+		header.MaxConcurrentShardRequests = qp.DsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
+	}
+	return &header
+}
+func (qp *ElasticSearchQueryParser) payloadReplace(payload string, model *simplejson.Json) (string, error) {
+	parsedInterval, err := tsdb.GetIntervalFrom(qp.DsInfo, model, time.Millisecond)
+	if err != nil {
+		return "", nil
+	}
+
+	interval := intervalCalculator.Calculate(qp.TimeRange, parsedInterval)
+	glog.Warn(spew.Sdump(interval))
+	payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", qp.TimeRange.GetFromAsMsEpoch()), -1)
+	payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", qp.TimeRange.GetToAsMsEpoch()), -1)
+	payload = strings.Replace(payload, "$interval", interval.Text, -1)
+	payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
+	payload = strings.Replace(payload, "$__interval", interval.Text, -1)
+
+	return payload, nil
+}
+
+func (qp *ElasticSearchQueryParser) getIndexList() string {
+	_, err := qp.DsInfo.JsonData.Get("interval").String()
+	if err != nil {
+		return qp.DsInfo.Database
+	}
+	// todo: support interval
+	return qp.DsInfo.Database
+}

+ 131 - 0
pkg/tsdb/elasticsearch/models.go

@@ -0,0 +1,131 @@
+package elasticsearch
+
+import (
+	"github.com/grafana/grafana/pkg/components/simplejson"
+	"bytes"
+	"fmt"
+	"encoding/json"
+)
+
+type QueryHeader struct {
+	SearchType                 string      `json:"search_type"`
+	IgnoreUnavailable          bool        `json:"ignore_unavailable"`
+	Index                      interface{} `json:"index"`
+	MaxConcurrentShardRequests int         `json:"max_concurrent_shard_requests"`
+}
+
+func (q *QueryHeader) String() (string) {
+	r, _ := json.Marshal(q)
+	return string(r)
+}
+
+type Query struct {
+	Query map[string]interface{} `json:"query"`
+	Aggs  Aggs                   `json:"aggs"`
+	Size  int                    `json:"size"`
+}
+
+type Aggs map[string]interface{}
+
+type HistogramAgg struct {
+	Interval    string `json:"interval,omitempty"`
+	Field       string `json:"field"`
+	MinDocCount int    `json:"min_doc_count"`
+	Missing     string `json:"missing,omitempty"`
+}
+
+type DateHistogramAgg struct {
+	HistogramAgg
+	ExtendedBounds ExtendedBounds `json:"extended_bounds"`
+	Format         string         `json:"format"`
+}
+
+type FiltersAgg struct {
+	Filter map[string]interface{} `json:"filter"`
+}
+
+type TermsAggSetting struct {
+	Field       string                 `json:"field"`
+	Size        int                    `json:"size"`
+	Order       map[string]interface{} `json:"order"`
+	MinDocCount int                    `json:"min_doc_count"`
+	Missing     string                 `json:"missing"`
+}
+
+type TermsAgg struct {
+	Terms TermsAggSetting `json:"terms"`
+	Aggs  Aggs            `json:"aggs"`
+}
+
+type ExtendedBounds struct {
+	Min string `json:"min"`
+	Max string `json:"max"`
+}
+
+type RangeFilter struct {
+	Range map[string]RangeFilterSetting `json:"range"`
+}
+type RangeFilterSetting struct {
+	Gte    string `json:"gte"`
+	Lte    string `json:"lte"`
+	Format string `json:"format"`
+}
+
+func newRangeFilter(field string, rangeFilterSetting RangeFilterSetting) *RangeFilter {
+	return &RangeFilter{
+		map[string]RangeFilterSetting{field: rangeFilterSetting}}
+}
+
+type QueryStringFilter struct {
+	QueryString QueryStringFilterSetting `json:"query_string"`
+}
+type QueryStringFilterSetting struct {
+	AnalyzeWildcard bool   `json:"analyze_wildcard"`
+	Query           string `json:"query"`
+}
+
+func newQueryStringFilter(analyzeWildcard bool, query string) *QueryStringFilter {
+	return &QueryStringFilter{QueryStringFilterSetting{AnalyzeWildcard: analyzeWildcard, Query: query}}
+}
+
+type BoolQuery struct {
+	Filter []interface{} `json:"filter"`
+}
+
+type Metric map[string]interface{}
+
+type Responses struct {
+	Responses []Response `json:"responses"`
+}
+
+type Response struct {
+	Status       int                    `json:"status"`
+	Err          map[string]interface{} `json:"error"`
+	Aggregations map[string]interface{} `json:"aggregations"`
+}
+
+func (r *Response) getErrMsg() (string) {
+	var msg bytes.Buffer
+	errJson := simplejson.NewFromAny(r.Err)
+	errType, err := errJson.Get("type").String()
+	if err == nil {
+		msg.WriteString(fmt.Sprintf("type:%s", errType))
+	}
+
+	reason, err := errJson.Get("type").String()
+	if err == nil {
+		msg.WriteString(fmt.Sprintf("reason:%s", reason))
+	}
+	return msg.String()
+}
+
+type PercentilesResult struct {
+	Buckets struct {
+		map[string]struct {
+			Values map[string]string `json:"values"`
+		}
+		KeyAsString string `json:"key_as_string"`
+		Key         int64  `json:"key"`
+		DocCount    int    `json:"doc_count"`
+	} `json:"buckets"`
+}

+ 204 - 0
pkg/tsdb/elasticsearch/query.go

@@ -0,0 +1,204 @@
+package elasticsearch
+
+import (
+	"errors"
+	"github.com/grafana/grafana/pkg/components/simplejson"
+)
+
+var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
+	Lte:    "$timeTo",
+	Format: "epoch_millis"}
+
+type QueryBuilder struct {
+	TimeField  string
+	RawQuery   string
+	BucketAggs []interface{}
+	Metrics    []interface{}
+	Alias      string
+}
+
+func (b *QueryBuilder) Build() (Query, error) {
+	var err error
+	var res Query
+	res.Query = make(map[string]interface{})
+	res.Size = 0
+
+	if err != nil {
+		return res, err
+	}
+
+	boolQuery := BoolQuery{}
+	boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(b.TimeField, rangeFilterSetting))
+	boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, b.RawQuery))
+	res.Query["bool"] = boolQuery
+
+	// handle document query
+	if len(b.BucketAggs) == 0 {
+		if len(b.Metrics) > 0 {
+			metric := simplejson.NewFromAny(b.Metrics[0])
+			if metric.Get("type").MustString("") == "raw_document" {
+				return res, errors.New("alert not support Raw_Document")
+			}
+		}
+	}
+	aggs, err := b.parseAggs(b.BucketAggs, b.Metrics)
+	res.Aggs = aggs["aggs"].(Aggs)
+
+	return res, err
+}
+
+func (b *QueryBuilder) parseAggs(bucketAggs []interface{}, metrics []interface{}) (Aggs, error) {
+	query := make(Aggs)
+	nestedAggs := query
+	for _, aggRaw := range bucketAggs {
+		esAggs := make(Aggs)
+		aggJson := simplejson.NewFromAny(aggRaw)
+		aggType, err := aggJson.Get("type").String()
+		if err != nil {
+			return nil, err
+		}
+		id, err := aggJson.Get("id").String()
+		if err != nil {
+			return nil, err
+		}
+
+		switch aggType {
+		case "date_histogram":
+			esAggs["date_histogram"] = b.getDateHistogramAgg(aggJson)
+		case "histogram":
+			esAggs["histogram"] = b.getHistogramAgg(aggJson)
+		case "filters":
+			esAggs["filters"] = b.getFilters(aggJson)
+		case "terms":
+			terms := b.getTerms(aggJson)
+			esAggs["terms"] = terms.Terms
+			esAggs["aggs"] = terms.Aggs
+		case "geohash_grid":
+			return nil, errors.New("alert not support Geo_Hash_Grid")
+		}
+
+		if _, ok := nestedAggs["aggs"]; !ok {
+			nestedAggs["aggs"] = make(Aggs)
+		}
+
+		if aggs, ok := (nestedAggs["aggs"]).(Aggs); ok {
+			aggs[id] = esAggs
+		}
+		nestedAggs = esAggs
+
+	}
+	nestedAggs["aggs"] = make(Aggs)
+
+	for _, metricRaw := range metrics {
+		metric := make(Metric)
+		metricJson := simplejson.NewFromAny(metricRaw)
+
+		id, err := metricJson.Get("id").String()
+		if err != nil {
+			return nil, err
+		}
+		metricType, err := metricJson.Get("type").String()
+		if err != nil {
+			return nil, err
+		}
+		if metricType == "count" {
+			continue
+		}
+
+		// todo support pipeline Agg
+
+		settings := metricJson.Get("settings").MustMap()
+		settings["field"] = metricJson.Get("field").MustString()
+		metric[metricType] = settings
+		nestedAggs["aggs"].(Aggs)[id] = metric
+	}
+	return query, nil
+}
+
+func (b *QueryBuilder) getDateHistogramAgg(model *simplejson.Json) DateHistogramAgg {
+	agg := &DateHistogramAgg{}
+	settings := simplejson.NewFromAny(model.Get("settings").Interface())
+	interval, err := settings.Get("interval").String()
+	if err == nil {
+		agg.Interval = interval
+	}
+	agg.Field = b.TimeField
+	agg.MinDocCount = settings.Get("min_doc_count").MustInt(0)
+	agg.ExtendedBounds = ExtendedBounds{"$timeFrom", "$timeTo"}
+	agg.Format = "epoch_millis"
+
+	if agg.Interval == "auto" {
+		agg.Interval = "$__interval"
+	}
+
+	missing, err := settings.Get("missing").String()
+	if err == nil {
+		agg.Missing = missing
+	}
+	return *agg
+}
+
+func (b *QueryBuilder) getHistogramAgg(model *simplejson.Json) HistogramAgg {
+	agg := &HistogramAgg{}
+	settings := simplejson.NewFromAny(model.Get("settings").Interface())
+	interval, err := settings.Get("interval").String()
+	if err == nil {
+		agg.Interval = interval
+	}
+	field, err := model.Get("field").String()
+	if err == nil {
+		agg.Field = field
+	}
+	agg.MinDocCount = settings.Get("min_doc_count").MustInt(0)
+	missing, err := settings.Get("missing").String()
+	if err == nil {
+		agg.Missing = missing
+	}
+	return *agg
+}
+
+func (b *QueryBuilder) getFilters(model *simplejson.Json) FiltersAgg {
+	agg := &FiltersAgg{}
+	settings := simplejson.NewFromAny(model.Get("settings").Interface())
+	for filter := range settings.Get("filters").MustArray() {
+		filterJson := simplejson.NewFromAny(filter)
+		query := filterJson.Get("query").MustString("")
+		label := filterJson.Get("label").MustString("")
+		if label == "" {
+			label = query
+		}
+		agg.Filter[label] = newQueryStringFilter(true, query)
+	}
+	return *agg
+}
+
+func (b *QueryBuilder) getTerms(model *simplejson.Json) TermsAgg {
+	agg := &TermsAgg{}
+	settings := simplejson.NewFromAny(model.Get("settings").Interface())
+	agg.Terms.Field = model.Get("field").MustString()
+	if settings == nil {
+		return *agg
+	}
+	agg.Terms.Size = settings.Get("size").MustInt(0)
+	if agg.Terms.Size == 0 {
+		agg.Terms.Size = 500
+	}
+	orderBy := settings.Get("orderBy").MustString("")
+	if orderBy != "" {
+		agg.Terms.Order[orderBy] = settings.Get("order").MustString("")
+		//	 if orderBy is a int, means this fields is metric result value
+		//	 TODO set subAggs
+	}
+
+	minDocCount, err := settings.Get("min_doc_count").Int()
+	if err == nil {
+		agg.Terms.MinDocCount = minDocCount
+	}
+
+	missing, err := settings.Get("missing").String()
+	if err == nil {
+		agg.Terms.Missing = missing
+	}
+
+	return *agg
+}

+ 111 - 0
pkg/tsdb/elasticsearch/response_parser.go

@@ -0,0 +1,111 @@
+package elasticsearch
+
+import (
+	"errors"
+	"fmt"
+	"github.com/grafana/grafana/pkg/components/null"
+	"github.com/grafana/grafana/pkg/components/simplejson"
+	"github.com/grafana/grafana/pkg/tsdb"
+	"strconv"
+)
+
+type ElasticsearchResponseParser struct {
+	Responses []Response
+	Targets   []QueryBuilder
+}
+
+func (rp *ElasticsearchResponseParser) getTimeSeries() []interface{} {
+	for i, res := range rp.Responses {
+		var series []interface{}
+		target := rp.Targets[i]
+		props := make(map[string]interface{})
+		rp.processBuckets(res.Aggregations, target, &series, props, 0)
+	}
+}
+
+func findAgg(target QueryBuilder, aggId string) (*simplejson.Json, error) {
+	for _, v := range target.BucketAggs {
+		aggDef := simplejson.NewFromAny(v)
+		if aggId == aggDef.Get("id").MustString() {
+			return aggDef, nil
+		}
+	}
+	return nil, errors.New("can't found aggDef, aggID:" + aggId)
+}
+
+func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target QueryBuilder, series *[]interface{}, props map[string]interface{}, depth int) error {
+	maxDepth := len(target.BucketAggs) - 1
+	for aggId, v := range aggs {
+		aggDef, _ := findAgg(target, aggId)
+		esAgg := simplejson.NewFromAny(v)
+		if aggDef == nil {
+			continue
+		}
+
+		if depth == maxDepth {
+			if aggDef.Get("type").MustString() == "date_histogram" {
+				rp.processMetrics(esAgg, target, series, props)
+			}
+		}
+
+	}
+
+}
+
+func mapCopy(originalMap, newMap *map[string]string) {
+	for k, v := range originalMap {
+		newMap[k] = v
+	}
+
+}
+
+func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target QueryBuilder, props map[string]string) ([]*tsdb.TimeSeries, error) {
+	var series []*tsdb.TimeSeries
+	for _, v := range target.Metrics {
+		metric := simplejson.NewFromAny(v)
+		if metric.Get("hide").MustBool(false) {
+			continue
+		}
+		metricId := fmt.Sprintf("%d", metric.Get("id").MustInt())
+		metricField := metric.Get("field").MustString()
+
+		switch metric.Get("type").MustString() {
+		case "count":
+			newSeries := tsdb.TimeSeries{}
+			for _, v := range esAgg.Get("buckets").MustMap() {
+				bucket := simplejson.NewFromAny(v)
+				value := bucket.Get("doc_count").MustFloat64()
+				key := bucket.Get("key").MustFloat64()
+				newSeries.Points = append(newSeries.Points, tsdb.TimePoint{null.FloatFromPtr(&value), null.FloatFromPtr(&key)})
+			}
+			newSeries.Tags = props
+			newSeries.Tags["metric"] = "count"
+			series = append(series, &newSeries)
+
+		case "percentiles":
+			buckets := esAgg.Get("buckets").MustArray()
+			if len(buckets) == 0 {
+				break
+			}
+
+			firstBucket := simplejson.NewFromAny(buckets[0])
+			percentiles := firstBucket.GetPath(metricId, "values").MustMap()
+
+			for percentileName := range percentiles {
+				newSeries := tsdb.TimeSeries{}
+				newSeries.Tags = props
+				newSeries.Tags["metric"] = "p" + percentileName
+				newSeries.Tags["field"] = metricField
+				for _, v := range buckets {
+					bucket := simplejson.NewFromAny(v)
+					valueStr := bucket.GetPath(metricId, "values", percentileName).MustString()
+					value, _ := strconv.ParseFloat(valueStr, 64)
+					key := bucket.Get("key").MustFloat64()
+					newSeries.Points = append(newSeries.Points, tsdb.TimePoint{null.FloatFromPtr(&value), null.FloatFromPtr(&key)})
+				}
+				series = append(series, &newSeries)
+			}
+		}
+	}
+	return series
+}

+ 1 - 0
public/app/plugins/datasource/elasticsearch/plugin.json

@@ -20,6 +20,7 @@
     "version": "5.0.0"
   },
 
+  "alerting": true,
   "annotations": true,
   "metrics": true,