| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package elasticsearch
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "github.com/grafana/grafana/pkg/components/simplejson"
- "github.com/grafana/grafana/pkg/models"
- "github.com/grafana/grafana/pkg/tsdb"
- "src/github.com/davecgh/go-spew/spew"
- "strconv"
- "strings"
- "time"
- )
- type ElasticSearchQueryParser struct {
- DsInfo *models.DataSource
- TimeRange *tsdb.TimeRange
- Queries []*tsdb.Query
- }
- func (qp *ElasticSearchQueryParser) Parse() (string, []*QueryBuilder, error) {
- payload := bytes.Buffer{}
- queryHeader := qp.getQueryHeader()
- targets := make([]*QueryBuilder, 0)
- for _, q := range qp.Queries {
- timeField, err := q.Model.Get("timeField").String()
- if err != nil {
- return "", nil, err
- }
- rawQuery := q.Model.Get("query").MustString("")
- bucketAggs := q.Model.Get("bucketAggs").MustArray()
- metrics := q.Model.Get("metrics").MustArray()
- alias := q.Model.Get("alias").MustString("")
- builder := QueryBuilder{timeField, rawQuery, bucketAggs, metrics, alias}
- targets = append(targets, &builder)
- query, err := builder.Build()
- if err != nil {
- return "", nil, err
- }
- queryBytes, err := json.Marshal(query)
- if err != nil {
- return "", nil, err
- }
- payload.WriteString(queryHeader.String() + "\n")
- payload.WriteString(string(queryBytes) + "\n")
- }
- p, err := qp.payloadReplace(payload.String(), qp.DsInfo.JsonData)
- return p, targets, err
- }
- func (qp *ElasticSearchQueryParser) getQueryHeader() *QueryHeader {
- var header QueryHeader
- esVersion := qp.DsInfo.JsonData.Get("esVersion").MustInt()
- searchType := "query_then_fetch"
- if esVersion < 5 {
- searchType = "count"
- }
- header.SearchType = searchType
- header.IgnoreUnavailable = true
- header.Index = qp.getIndexList()
- if esVersion >= 56 {
- header.MaxConcurrentShardRequests = qp.DsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
- }
- return &header
- }
- func (qp *ElasticSearchQueryParser) payloadReplace(payload string, model *simplejson.Json) (string, error) {
- parsedInterval, err := tsdb.GetIntervalFrom(qp.DsInfo, model, time.Millisecond)
- if err != nil {
- return "", nil
- }
- interval := intervalCalculator.Calculate(qp.TimeRange, parsedInterval)
- glog.Warn(spew.Sdump(interval))
- payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", qp.TimeRange.GetFromAsMsEpoch()), -1)
- payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", qp.TimeRange.GetToAsMsEpoch()), -1)
- payload = strings.Replace(payload, "$interval", interval.Text, -1)
- payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
- payload = strings.Replace(payload, "$__interval", interval.Text, -1)
- return payload, nil
- }
- func (qp *ElasticSearchQueryParser) getIndexList() string {
- _, err := qp.DsInfo.JsonData.Get("interval").String()
- if err != nil {
- return qp.DsInfo.Database
- }
- // todo: support interval
- return qp.DsInfo.Database
- }
|