time_series_query.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package elasticsearch
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "time"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/models"
  11. "github.com/grafana/grafana/pkg/setting"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "golang.org/x/net/context/ctxhttp"
  14. )
  15. type timeSeriesQuery struct {
  16. queries []*Query
  17. }
  18. func (e *ElasticsearchExecutor) executeTimeSeriesQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
  19. result := &tsdb.Response{}
  20. result.Results = make(map[string]*tsdb.QueryResult)
  21. tsQueryParser := newTimeSeriesQueryParser(dsInfo)
  22. query, err := tsQueryParser.parse(tsdbQuery)
  23. if err != nil {
  24. return nil, err
  25. }
  26. buff := bytes.Buffer{}
  27. for _, q := range query.queries {
  28. s, err := q.Build(tsdbQuery, dsInfo)
  29. if err != nil {
  30. return nil, err
  31. }
  32. buff.WriteString(s)
  33. }
  34. payload := buff.String()
  35. if setting.Env == setting.DEV {
  36. glog.Debug("Elasticsearch playload", "raw playload", payload)
  37. }
  38. glog.Info("Elasticsearch playload", "raw playload", payload)
  39. req, err := e.createRequest(dsInfo, payload)
  40. if err != nil {
  41. return nil, err
  42. }
  43. httpClient, err := dsInfo.GetHttpClient()
  44. if err != nil {
  45. return nil, err
  46. }
  47. resp, err := ctxhttp.Do(ctx, httpClient, req)
  48. if err != nil {
  49. return nil, err
  50. }
  51. if resp.StatusCode/100 != 2 {
  52. return nil, fmt.Errorf("elasticsearch returned statuscode invalid status code: %v", resp.Status)
  53. }
  54. var responses Responses
  55. defer resp.Body.Close()
  56. dec := json.NewDecoder(resp.Body)
  57. dec.UseNumber()
  58. err = dec.Decode(&responses)
  59. if err != nil {
  60. return nil, err
  61. }
  62. for _, res := range responses.Responses {
  63. if res.Err != nil {
  64. return nil, errors.New(res.getErrMsg())
  65. }
  66. }
  67. responseParser := ElasticsearchResponseParser{responses.Responses, query.queries}
  68. queryRes := responseParser.getTimeSeries()
  69. result.Results["A"] = queryRes
  70. return result, nil
  71. }
  72. type timeSeriesQueryParser struct {
  73. ds *models.DataSource
  74. }
  75. func newTimeSeriesQueryParser(ds *models.DataSource) *timeSeriesQueryParser {
  76. return &timeSeriesQueryParser{
  77. ds: ds,
  78. }
  79. }
  80. func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) (*timeSeriesQuery, error) {
  81. queries := make([]*Query, 0)
  82. for _, q := range tsdbQuery.Queries {
  83. model := q.Model
  84. timeField, err := model.Get("timeField").String()
  85. if err != nil {
  86. return nil, err
  87. }
  88. rawQuery := model.Get("query").MustString()
  89. bucketAggs, err := p.parseBucketAggs(model)
  90. if err != nil {
  91. return nil, err
  92. }
  93. metrics, err := p.parseMetrics(model)
  94. if err != nil {
  95. return nil, err
  96. }
  97. alias := model.Get("alias").MustString("")
  98. parsedInterval, err := tsdb.GetIntervalFrom(p.ds, model, time.Millisecond)
  99. if err != nil {
  100. return nil, err
  101. }
  102. queries = append(queries, &Query{
  103. TimeField: timeField,
  104. RawQuery: rawQuery,
  105. BucketAggs: bucketAggs,
  106. Metrics: metrics,
  107. Alias: alias,
  108. Interval: parsedInterval,
  109. })
  110. }
  111. return &timeSeriesQuery{queries: queries}, nil
  112. }
  113. func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
  114. var err error
  115. var result []*BucketAgg
  116. for _, t := range model.Get("bucketAggs").MustArray() {
  117. aggJson := simplejson.NewFromAny(t)
  118. agg := &BucketAgg{}
  119. agg.Type, err = aggJson.Get("type").String()
  120. if err != nil {
  121. return nil, err
  122. }
  123. agg.ID, err = aggJson.Get("id").String()
  124. if err != nil {
  125. return nil, err
  126. }
  127. agg.Field = aggJson.Get("field").MustString()
  128. agg.Settings = simplejson.NewFromAny(aggJson.Get("settings").MustMap())
  129. result = append(result, agg)
  130. }
  131. return result, nil
  132. }
  133. func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*Metric, error) {
  134. var err error
  135. var result []*Metric
  136. for _, t := range model.Get("metrics").MustArray() {
  137. metricJSON := simplejson.NewFromAny(t)
  138. metric := &Metric{}
  139. metric.Field = metricJSON.Get("field").MustString()
  140. metric.Hide = metricJSON.Get("hide").MustBool(false)
  141. metric.ID, err = metricJSON.Get("id").String()
  142. if err != nil {
  143. return nil, err
  144. }
  145. metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
  146. metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
  147. metric.Type, err = metricJSON.Get("type").String()
  148. if err != nil {
  149. return nil, err
  150. }
  151. result = append(result, metric)
  152. }
  153. return result, nil
  154. }