| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- package elasticsearch
- import (
- "errors"
- "fmt"
- "github.com/grafana/grafana/pkg/components/null"
- "github.com/grafana/grafana/pkg/components/simplejson"
- "github.com/grafana/grafana/pkg/tsdb"
- "regexp"
- "strconv"
- "strings"
- )
- type ElasticsearchResponseParser struct {
- Responses []Response
- Targets []*Query
- }
- func (rp *ElasticsearchResponseParser) getTimeSeries() *tsdb.QueryResult {
- queryRes := tsdb.NewQueryResult()
- for i, res := range rp.Responses {
- target := rp.Targets[i]
- props := make(map[string]string)
- series := make([]*tsdb.TimeSeries, 0)
- rp.processBuckets(res.Aggregations, target, &series, props, 0)
- rp.nameSeries(&series, target)
- queryRes.Series = append(queryRes.Series, series...)
- }
- return queryRes
- }
- func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target *Query, series *[]*tsdb.TimeSeries, props map[string]string, depth int) error {
- var err error
- maxDepth := len(target.BucketAggs) - 1
- for aggId, v := range aggs {
- aggDef, _ := findAgg(target, aggId)
- esAgg := simplejson.NewFromAny(v)
- if aggDef == nil {
- continue
- }
- if depth == maxDepth {
- if aggDef.Type == "date_histogram" {
- err = rp.processMetrics(esAgg, target, series, props)
- if err != nil {
- return err
- }
- } else {
- return fmt.Errorf("not support type:%s", aggDef.Type)
- }
- } else {
- for i, b := range esAgg.Get("buckets").MustArray() {
- bucket := simplejson.NewFromAny(b)
- newProps := props
- if key, err := bucket.Get("key").String(); err == nil {
- newProps[aggDef.Field] = key
- } else {
- props["filter"] = strconv.Itoa(i)
- }
- if key, err := bucket.Get("key_as_string").String(); err == nil {
- props[aggDef.Field] = key
- }
- rp.processBuckets(bucket.MustMap(), target, series, newProps, depth+1)
- }
- }
- }
- return nil
- }
- func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *[]*tsdb.TimeSeries, props map[string]string) error {
- for _, metric := range target.Metrics {
- if metric.Hide {
- continue
- }
- switch metric.Type {
- case "count":
- newSeries := tsdb.TimeSeries{}
- for _, v := range esAgg.Get("buckets").MustArray() {
- bucket := simplejson.NewFromAny(v)
- value := castToNullFloat(bucket.Get("doc_count"))
- key := castToNullFloat(bucket.Get("key"))
- newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
- }
- newSeries.Tags = props
- newSeries.Tags["metric"] = "count"
- *series = append(*series, &newSeries)
- case "percentiles":
- buckets := esAgg.Get("buckets").MustArray()
- if len(buckets) == 0 {
- break
- }
- firstBucket := simplejson.NewFromAny(buckets[0])
- percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
- for percentileName := range percentiles {
- newSeries := tsdb.TimeSeries{}
- newSeries.Tags = props
- newSeries.Tags["metric"] = "p" + percentileName
- newSeries.Tags["field"] = metric.Field
- for _, v := range buckets {
- bucket := simplejson.NewFromAny(v)
- value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName))
- key := castToNullFloat(bucket.Get("key"))
- newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
- }
- *series = append(*series, &newSeries)
- }
- default:
- newSeries := tsdb.TimeSeries{}
- newSeries.Tags = props
- newSeries.Tags["metric"] = metric.Type
- newSeries.Tags["field"] = metric.Field
- for _, v := range esAgg.Get("buckets").MustArray() {
- bucket := simplejson.NewFromAny(v)
- key := castToNullFloat(bucket.Get("key"))
- valueObj, err := bucket.Get(metric.ID).Map()
- if err != nil {
- break
- }
- var value null.Float
- if _, ok := valueObj["normalized_value"]; ok {
- value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value"))
- } else {
- value = castToNullFloat(bucket.GetPath(metric.ID, "value"))
- }
- newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
- }
- *series = append(*series, &newSeries)
- }
- }
- return nil
- }
- func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries, target *Query) {
- set := make(map[string]string)
- for _, v := range *seriesList {
- if metricType, exists := v.Tags["metric"]; exists {
- if _, ok := set[metricType]; !ok {
- set[metricType] = ""
- }
- }
- }
- metricTypeCount := len(set)
- for _, series := range *seriesList {
- series.Name = rp.getSeriesName(series, target, metricTypeCount)
- }
- }
- func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
- metricType := series.Tags["metric"]
- metricName := rp.getMetricName(metricType)
- delete(series.Tags, "metric")
- field := ""
- if v, ok := series.Tags["field"]; ok {
- field = v
- delete(series.Tags, "field")
- }
- if target.Alias != "" {
- var re = regexp.MustCompile(`{{([\s\S]+?)}}`)
- for _, match := range re.FindAllString(target.Alias, -1) {
- group := match[2 : len(match)-2]
- if strings.HasPrefix(group, "term ") {
- if term, ok := series.Tags["term "]; ok {
- strings.Replace(target.Alias, match, term, 1)
- }
- }
- if v, ok := series.Tags[group]; ok {
- strings.Replace(target.Alias, match, v, 1)
- }
- switch group {
- case "metric":
- strings.Replace(target.Alias, match, metricName, 1)
- case "field":
- strings.Replace(target.Alias, match, field, 1)
- }
- }
- }
- // todo, if field and pipelineAgg
- if field != "" && isPipelineAgg(metricType) {
- found := false
- for _, metric := range target.Metrics {
- if metric.ID == field {
- metricName += " " + describeMetric(metric.Type, field)
- found = true
- }
- }
- if !found {
- metricName = "Unset"
- }
- } else if field != "" {
- metricName += " " + field
- }
- if len(series.Tags) == 0 {
- return metricName
- }
- name := ""
- for _, v := range series.Tags {
- name += v + " "
- }
- if metricTypeCount == 1 {
- return strings.TrimSpace(name)
- }
- return strings.TrimSpace(name) + " " + metricName
- }
- func (rp *ElasticsearchResponseParser) getMetricName(metric string) string {
- if text, ok := metricAggType[metric]; ok {
- return text
- }
- if text, ok := extendedStats[metric]; ok {
- return text
- }
- return metric
- }
- func castToNullFloat(j *simplejson.Json) null.Float {
- f, err := j.Float64()
- if err == nil {
- return null.FloatFrom(f)
- }
- s, err := j.String()
- if err == nil {
- v, _ := strconv.ParseFloat(s, 64)
- return null.FloatFromPtr(&v)
- }
- return null.NewFloat(0, false)
- }
- func findAgg(target *Query, aggId string) (*BucketAgg, error) {
- for _, v := range target.BucketAggs {
- if aggId == v.ID {
- return v, nil
- }
- }
- return nil, errors.New("can't found aggDef, aggID:" + aggId)
- }
|