query.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package elasticsearch
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/tsdb"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
  15. Lte: "$timeTo",
  16. Format: "epoch_millis"}
  17. type Query struct {
  18. TimeField string `json:"timeField"`
  19. RawQuery string `json:"query"`
  20. BucketAggs []*BucketAgg `json:"bucketAggs"`
  21. Metrics []*Metric `json:"metrics"`
  22. Alias string `json:"Alias"`
  23. Interval time.Duration
  24. }
  25. func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (string, error) {
  26. var req Request
  27. payload := bytes.Buffer{}
  28. req.Size = 0
  29. q.renderReqQuery(&req)
  30. // handle document query
  31. if q.isRawDocumentQuery() {
  32. return "", errors.New("alert not support Raw_Document")
  33. }
  34. err := q.parseAggs(&req)
  35. if err != nil {
  36. return "", err
  37. }
  38. reqBytes, err := json.Marshal(req)
  39. reqHeader := getRequestHeader(queryContext.TimeRange, dsInfo)
  40. payload.WriteString(reqHeader.String() + "\n")
  41. payload.WriteString(string(reqBytes) + "\n")
  42. return q.renderTemplate(payload.String(), queryContext)
  43. }
  44. func (q *Query) isRawDocumentQuery() bool {
  45. if len(q.BucketAggs) == 0 {
  46. if len(q.Metrics) > 0 {
  47. metric := simplejson.NewFromAny(q.Metrics[0])
  48. if metric.Get("type").MustString("") == "raw_document" {
  49. return true
  50. }
  51. }
  52. }
  53. return false
  54. }
  55. func (q *Query) renderReqQuery(req *Request) {
  56. req.Query = make(map[string]interface{})
  57. boolQuery := BoolQuery{}
  58. boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(q.TimeField, rangeFilterSetting))
  59. boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, q.RawQuery))
  60. req.Query["bool"] = boolQuery
  61. }
  62. func (q *Query) parseAggs(req *Request) error {
  63. aggs := make(Aggs)
  64. nestedAggs := aggs
  65. for _, agg := range q.BucketAggs {
  66. esAggs := make(Aggs)
  67. switch agg.Type {
  68. case "date_histogram":
  69. esAggs["date_histogram"] = q.getDateHistogramAgg(agg)
  70. case "histogram":
  71. esAggs["histogram"] = q.getHistogramAgg(agg)
  72. case "filters":
  73. esAggs["filters"] = q.getFilters(agg)
  74. case "terms":
  75. terms := q.getTerms(agg)
  76. esAggs["terms"] = terms.Terms
  77. esAggs["aggs"] = terms.Aggs
  78. case "geohash_grid":
  79. return errors.New("alert not support Geo_Hash_Grid")
  80. }
  81. if _, ok := nestedAggs["aggs"]; !ok {
  82. nestedAggs["aggs"] = make(Aggs)
  83. }
  84. if aggs, ok := (nestedAggs["aggs"]).(Aggs); ok {
  85. aggs[agg.ID] = esAggs
  86. }
  87. nestedAggs = esAggs
  88. }
  89. nestedAggs["aggs"] = make(Aggs)
  90. for _, metric := range q.Metrics {
  91. subAgg := make(Aggs)
  92. if metric.Type == "count" {
  93. continue
  94. }
  95. settings := metric.Settings.MustMap(make(map[string]interface{}))
  96. if isPipelineAgg(metric.Type) {
  97. if _, err := strconv.Atoi(metric.PipelineAggregate); err == nil {
  98. settings["buckets_path"] = metric.PipelineAggregate
  99. } else {
  100. continue
  101. }
  102. } else {
  103. settings["field"] = metric.Field
  104. }
  105. subAgg[metric.Type] = settings
  106. nestedAggs["aggs"].(Aggs)[metric.ID] = subAgg
  107. }
  108. req.Aggs = aggs["aggs"].(Aggs)
  109. return nil
  110. }
  111. func (q *Query) getDateHistogramAgg(target *BucketAgg) *DateHistogramAgg {
  112. agg := &DateHistogramAgg{}
  113. interval, err := target.Settings.Get("interval").String()
  114. if err == nil {
  115. agg.Interval = interval
  116. }
  117. agg.Field = q.TimeField
  118. agg.MinDocCount = target.Settings.Get("min_doc_count").MustInt(0)
  119. agg.ExtendedBounds = ExtendedBounds{"$timeFrom", "$timeTo"}
  120. agg.Format = "epoch_millis"
  121. if agg.Interval == "auto" {
  122. agg.Interval = "$__interval"
  123. }
  124. missing, err := target.Settings.Get("missing").String()
  125. if err == nil {
  126. agg.Missing = missing
  127. }
  128. return agg
  129. }
  130. func (q *Query) getHistogramAgg(target *BucketAgg) *HistogramAgg {
  131. agg := &HistogramAgg{}
  132. interval, err := target.Settings.Get("interval").String()
  133. if err == nil {
  134. agg.Interval = interval
  135. }
  136. if target.Field != "" {
  137. agg.Field = target.Field
  138. }
  139. agg.MinDocCount = target.Settings.Get("min_doc_count").MustInt(0)
  140. missing, err := target.Settings.Get("missing").String()
  141. if err == nil {
  142. agg.Missing = missing
  143. }
  144. return agg
  145. }
  146. func (q *Query) getFilters(target *BucketAgg) *FiltersAgg {
  147. agg := &FiltersAgg{}
  148. agg.Filters = map[string]interface{}{}
  149. for _, filter := range target.Settings.Get("filters").MustArray() {
  150. filterJson := simplejson.NewFromAny(filter)
  151. query := filterJson.Get("query").MustString("")
  152. label := filterJson.Get("label").MustString("")
  153. if label == "" {
  154. label = query
  155. }
  156. agg.Filters[label] = newQueryStringFilter(true, query)
  157. }
  158. return agg
  159. }
  160. func (q *Query) getTerms(target *BucketAgg) *TermsAggWrap {
  161. agg := &TermsAggWrap{Aggs: make(Aggs)}
  162. agg.Terms.Field = target.Field
  163. if len(target.Settings.MustMap()) == 0 {
  164. return agg
  165. }
  166. sizeStr := target.Settings.Get("size").MustString("")
  167. size, err := strconv.Atoi(sizeStr)
  168. if err != nil {
  169. size = 500
  170. }
  171. agg.Terms.Size = size
  172. orderBy, err := target.Settings.Get("orderBy").String()
  173. if err == nil {
  174. agg.Terms.Order = make(map[string]interface{})
  175. agg.Terms.Order[orderBy] = target.Settings.Get("order").MustString("")
  176. if _, err := strconv.Atoi(orderBy); err != nil {
  177. for _, metricI := range q.Metrics {
  178. metric := simplejson.NewFromAny(metricI)
  179. metricId := metric.Get("id").MustString()
  180. if metricId == orderBy {
  181. subAggs := make(Aggs)
  182. metricField := metric.Get("field").MustString()
  183. metricType := metric.Get("type").MustString()
  184. subAggs[metricType] = map[string]string{"field": metricField}
  185. agg.Aggs = make(Aggs)
  186. agg.Aggs[metricId] = subAggs
  187. break
  188. }
  189. }
  190. }
  191. }
  192. missing, err := target.Settings.Get("missing").String()
  193. if err == nil {
  194. agg.Terms.Missing = missing
  195. }
  196. return agg
  197. }
  198. func (q *Query) renderTemplate(payload string, queryContext *tsdb.TsdbQuery) (string, error) {
  199. timeRange := queryContext.TimeRange
  200. interval := intervalCalculator.Calculate(timeRange, q.Interval)
  201. payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", timeRange.GetFromAsMsEpoch()), -1)
  202. payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", timeRange.GetToAsMsEpoch()), -1)
  203. payload = strings.Replace(payload, "$interval", interval.Text, -1)
  204. payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
  205. payload = strings.Replace(payload, "$__interval", interval.Text, -1)
  206. return payload, nil
  207. }