| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- package elasticsearch
- import (
- "fmt"
- "github.com/grafana/grafana/pkg/components/simplejson"
- "github.com/grafana/grafana/pkg/models"
- "github.com/grafana/grafana/pkg/tsdb"
- "github.com/leibowitz/moment"
- "strings"
- "time"
- )
- type ElasticSearchQueryParser struct {
- }
- func (qp *ElasticSearchQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource) (*Query, error) {
- //payload := bytes.Buffer{}
- //queryHeader := qp.getQueryHeader()
- timeField, err := model.Get("timeField").String()
- if err != nil {
- return nil, err
- }
- rawQuery := model.Get("query").MustString()
- bucketAggs, err := qp.parseBucketAggs(model)
- if err != nil {
- return nil, err
- }
- metrics, err := qp.parseMetrics(model)
- if err != nil {
- return nil, err
- }
- alias := model.Get("alias").MustString("")
- parsedInterval, err := tsdb.GetIntervalFrom(dsInfo, model, time.Millisecond)
- if err != nil {
- return nil, err
- }
- return &Query{timeField,
- rawQuery,
- bucketAggs,
- metrics,
- alias,
- parsedInterval}, nil
- }
- func (qp *ElasticSearchQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
- var err error
- var result []*BucketAgg
- for _, t := range model.Get("bucketAggs").MustArray() {
- aggJson := simplejson.NewFromAny(t)
- agg := &BucketAgg{}
- agg.Type, err = aggJson.Get("type").String()
- if err != nil {
- return nil, err
- }
- agg.ID, err = aggJson.Get("id").String()
- if err != nil {
- return nil, err
- }
- agg.Field = aggJson.Get("field").MustString()
- agg.Settings = simplejson.NewFromAny(aggJson.Get("settings").MustMap())
- result = append(result, agg)
- }
- return result, nil
- }
- func (qp *ElasticSearchQueryParser) parseMetrics(model *simplejson.Json) ([]*Metric, error) {
- var err error
- var result []*Metric
- for _, t := range model.Get("metrics").MustArray() {
- metricJson := simplejson.NewFromAny(t)
- metric := &Metric{}
- metric.Field = metricJson.Get("field").MustString()
- metric.Hide = metricJson.Get("hide").MustBool(false)
- metric.ID, err = metricJson.Get("id").String()
- if err != nil {
- return nil, err
- }
- metric.PipelineAggregate = metricJson.Get("pipelineAgg").MustString()
- metric.Settings = simplejson.NewFromAny(metricJson.Get("settings").MustMap())
- metric.Type, err = metricJson.Get("type").String()
- if err != nil {
- return nil, err
- }
- result = append(result, metric)
- }
- return result, nil
- }
- func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader {
- var header QueryHeader
- esVersion := dsInfo.JsonData.Get("esVersion").MustInt()
- searchType := "query_then_fetch"
- if esVersion < 5 {
- searchType = "count"
- }
- header.SearchType = searchType
- header.IgnoreUnavailable = true
- header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(), timeRange)
- if esVersion >= 56 {
- header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
- }
- return &header
- }
- func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string {
- if interval == "" {
- return pattern
- }
- var indexes []string
- indexParts := strings.Split(strings.TrimLeft(pattern, "["), "]")
- indexBase := indexParts[0]
- if len(indexParts) <= 1 {
- return pattern
- }
- indexDateFormat := indexParts[1]
- start := moment.NewMoment(timeRange.MustGetFrom())
- end := moment.NewMoment(timeRange.MustGetTo())
- indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
- for start.IsBefore(*end) {
- switch interval {
- case "Hourly":
- start = start.AddHours(1)
- case "Daily":
- start = start.AddDay()
- case "Weekly":
- start = start.AddWeeks(1)
- case "Monthly":
- start = start.AddMonths(1)
- case "Yearly":
- start = start.AddYears(1)
- }
- indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
- }
- return strings.Join(indexes, ",")
- }
|