| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- package elasticsearch
- import (
- "fmt"
- "strconv"
- "github.com/grafana/grafana/pkg/components/simplejson"
- "github.com/grafana/grafana/pkg/tsdb"
- "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
- )
- type timeSeriesQuery struct {
- client es.Client
- tsdbQuery *tsdb.TsdbQuery
- intervalCalculator tsdb.IntervalCalculator
- }
- var newTimeSeriesQuery = func(client es.Client, tsdbQuery *tsdb.TsdbQuery, intervalCalculator tsdb.IntervalCalculator) *timeSeriesQuery {
- return &timeSeriesQuery{
- client: client,
- tsdbQuery: tsdbQuery,
- intervalCalculator: intervalCalculator,
- }
- }
- func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
- result := &tsdb.Response{}
- result.Results = make(map[string]*tsdb.QueryResult)
- tsQueryParser := newTimeSeriesQueryParser()
- queries, err := tsQueryParser.parse(e.tsdbQuery)
- if err != nil {
- return nil, err
- }
- ms := e.client.MultiSearch()
- from := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetFromAsMsEpoch())
- to := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetToAsMsEpoch())
- for _, q := range queries {
- minInterval, err := e.client.GetMinInterval(q.Interval)
- if err != nil {
- return nil, err
- }
- interval := e.intervalCalculator.Calculate(e.tsdbQuery.TimeRange, minInterval)
- b := ms.Search(interval)
- b.Size(0)
- filters := b.Query().Bool().Filter()
- filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
- if q.RawQuery != "" {
- filters.AddQueryStringFilter(q.RawQuery, true)
- }
- if len(q.BucketAggs) == 0 {
- if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
- result.Results[q.RefID] = &tsdb.QueryResult{
- RefId: q.RefID,
- Error: fmt.Errorf("invalid query, missing metrics and aggregations"),
- ErrorString: "invalid query, missing metrics and aggregations",
- }
- continue
- }
- metric := q.Metrics[0]
- b.Size(metric.Settings.Get("size").MustInt(500))
- b.SortDesc("@timestamp", "boolean")
- b.AddDocValueField("@timestamp")
- continue
- }
- aggBuilder := b.Agg()
- // iterate backwards to create aggregations bottom-down
- for _, bucketAgg := range q.BucketAggs {
- switch bucketAgg.Type {
- case dateHistType:
- aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
- case histogramType:
- aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
- case filtersType:
- aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
- case termsType:
- aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
- case geohashGridType:
- aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
- }
- }
- for _, m := range q.Metrics {
- if m.Type == countType {
- continue
- }
- if isPipelineAgg(m.Type) {
- if isPipelineAggWithMultipleBucketPaths(m.Type) {
- if len(m.PipelineVariables) > 0 {
- bucketPaths := map[string]interface{}{}
- for name, pipelineAgg := range m.PipelineVariables {
- if _, err := strconv.Atoi(pipelineAgg); err == nil {
- var appliedAgg *MetricAgg
- for _, pipelineMetric := range q.Metrics {
- if pipelineMetric.ID == pipelineAgg {
- appliedAgg = pipelineMetric
- break
- }
- }
- if appliedAgg != nil {
- if appliedAgg.Type == countType {
- bucketPaths[name] = "_count"
- } else {
- bucketPaths[name] = pipelineAgg
- }
- }
- }
- }
- aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
- a.Settings = m.Settings.MustMap()
- })
- } else {
- continue
- }
- } else {
- if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
- var appliedAgg *MetricAgg
- for _, pipelineMetric := range q.Metrics {
- if pipelineMetric.ID == m.PipelineAggregate {
- appliedAgg = pipelineMetric
- break
- }
- }
- if appliedAgg != nil {
- bucketPath := m.PipelineAggregate
- if appliedAgg.Type == countType {
- bucketPath = "_count"
- }
- aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
- a.Settings = m.Settings.MustMap()
- })
- }
- } else {
- continue
- }
- }
- } else {
- aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
- a.Settings = m.Settings.MustMap()
- })
- }
- }
- }
- req, err := ms.Build()
- if err != nil {
- return nil, err
- }
- res, err := e.client.ExecuteMultisearch(req)
- if err != nil {
- return nil, err
- }
- rp := newResponseParser(res.Responses, queries)
- return rp.getTimeSeries()
- }
- func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder {
- aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
- a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")
- a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
- a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo}
- a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS)
- if a.Interval == "auto" {
- a.Interval = "$__interval"
- }
- if offset, err := bucketAgg.Settings.Get("offset").String(); err == nil {
- a.Offset = offset
- }
- if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
- a.Missing = &missing
- }
- aggBuilder = b
- })
- return aggBuilder
- }
- func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
- aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) {
- a.Interval = bucketAgg.Settings.Get("interval").MustInt(1000)
- a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
- if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil {
- a.Missing = &missing
- }
- aggBuilder = b
- })
- return aggBuilder
- }
- func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder {
- aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) {
- if size, err := bucketAgg.Settings.Get("size").Int(); err == nil {
- a.Size = size
- } else if size, err := bucketAgg.Settings.Get("size").String(); err == nil {
- a.Size, err = strconv.Atoi(size)
- if err != nil {
- a.Size = 500
- }
- } else {
- a.Size = 500
- }
- if a.Size == 0 {
- a.Size = 500
- }
- if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil {
- a.MinDocCount = &minDocCount
- }
- if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
- a.Missing = &missing
- }
- if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil {
- a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
- if _, err := strconv.Atoi(orderBy); err == nil {
- for _, m := range metrics {
- if m.ID == orderBy {
- b.Metric(m.ID, m.Type, m.Field, nil)
- break
- }
- }
- }
- }
- aggBuilder = b
- })
- return aggBuilder
- }
- func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
- filters := make(map[string]interface{})
- for _, filter := range bucketAgg.Settings.Get("filters").MustArray() {
- json := simplejson.NewFromAny(filter)
- query := json.Get("query").MustString()
- label := json.Get("label").MustString()
- if label == "" {
- label = query
- }
- filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true}
- }
- if len(filters) > 0 {
- aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) {
- a.Filters = filters
- aggBuilder = b
- })
- }
- return aggBuilder
- }
- func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
- aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) {
- a.Precision = bucketAgg.Settings.Get("precision").MustInt(3)
- aggBuilder = b
- })
- return aggBuilder
- }
- type timeSeriesQueryParser struct{}
- func newTimeSeriesQueryParser() *timeSeriesQueryParser {
- return &timeSeriesQueryParser{}
- }
- func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) ([]*Query, error) {
- queries := make([]*Query, 0)
- for _, q := range tsdbQuery.Queries {
- model := q.Model
- timeField, err := model.Get("timeField").String()
- if err != nil {
- return nil, err
- }
- rawQuery := model.Get("query").MustString()
- bucketAggs, err := p.parseBucketAggs(model)
- if err != nil {
- return nil, err
- }
- metrics, err := p.parseMetrics(model)
- if err != nil {
- return nil, err
- }
- alias := model.Get("alias").MustString("")
- interval := strconv.FormatInt(q.IntervalMs, 10) + "ms"
- queries = append(queries, &Query{
- TimeField: timeField,
- RawQuery: rawQuery,
- BucketAggs: bucketAggs,
- Metrics: metrics,
- Alias: alias,
- Interval: interval,
- RefID: q.RefId,
- })
- }
- return queries, nil
- }
- func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
- var err error
- var result []*BucketAgg
- for _, t := range model.Get("bucketAggs").MustArray() {
- aggJSON := simplejson.NewFromAny(t)
- agg := &BucketAgg{}
- agg.Type, err = aggJSON.Get("type").String()
- if err != nil {
- return nil, err
- }
- agg.ID, err = aggJSON.Get("id").String()
- if err != nil {
- return nil, err
- }
- agg.Field = aggJSON.Get("field").MustString()
- agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap())
- result = append(result, agg)
- }
- return result, nil
- }
- func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) {
- var err error
- var result []*MetricAgg
- for _, t := range model.Get("metrics").MustArray() {
- metricJSON := simplejson.NewFromAny(t)
- metric := &MetricAgg{}
- metric.Field = metricJSON.Get("field").MustString()
- metric.Hide = metricJSON.Get("hide").MustBool(false)
- metric.ID = metricJSON.Get("id").MustString()
- metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
- metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
- metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
- metric.Type, err = metricJSON.Get("type").String()
- if err != nil {
- return nil, err
- }
- if isPipelineAggWithMultipleBucketPaths(metric.Type) {
- metric.PipelineVariables = map[string]string{}
- pvArr := metricJSON.Get("pipelineVariables").MustArray()
- for _, v := range pvArr {
- kv := v.(map[string]interface{})
- metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string)
- }
- }
- result = append(result, metric)
- }
- return result, nil
- }
|