query.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "github.com/grafana/grafana/pkg/components/simplejson"
  5. "strconv"
  6. )
  7. var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
  8. Lte: "$timeTo",
  9. Format: "epoch_millis"}
  10. type QueryBuilder struct {
  11. TimeField string
  12. RawQuery string
  13. BucketAggs []interface{}
  14. Metrics []interface{}
  15. Alias string
  16. }
  17. func (b *QueryBuilder) Build() (Query, error) {
  18. var err error
  19. var res Query
  20. res.Query = make(map[string]interface{})
  21. res.Size = 0
  22. if err != nil {
  23. return res, err
  24. }
  25. boolQuery := BoolQuery{}
  26. boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(b.TimeField, rangeFilterSetting))
  27. boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, b.RawQuery))
  28. res.Query["bool"] = boolQuery
  29. // handle document query
  30. if len(b.BucketAggs) == 0 {
  31. if len(b.Metrics) > 0 {
  32. metric := simplejson.NewFromAny(b.Metrics[0])
  33. if metric.Get("type").MustString("") == "raw_document" {
  34. return res, errors.New("alert not support Raw_Document")
  35. }
  36. }
  37. }
  38. aggs, err := b.parseAggs(b.BucketAggs, b.Metrics)
  39. res.Aggs = aggs["aggs"].(Aggs)
  40. return res, err
  41. }
  42. func (b *QueryBuilder) parseAggs(bucketAggs []interface{}, metrics []interface{}) (Aggs, error) {
  43. query := make(Aggs)
  44. nestedAggs := query
  45. for _, aggRaw := range bucketAggs {
  46. esAggs := make(Aggs)
  47. aggJson := simplejson.NewFromAny(aggRaw)
  48. aggType, err := aggJson.Get("type").String()
  49. if err != nil {
  50. return nil, err
  51. }
  52. id, err := aggJson.Get("id").String()
  53. if err != nil {
  54. return nil, err
  55. }
  56. switch aggType {
  57. case "date_histogram":
  58. esAggs["date_histogram"] = b.getDateHistogramAgg(aggJson)
  59. case "histogram":
  60. esAggs["histogram"] = b.getHistogramAgg(aggJson)
  61. case "filters":
  62. esAggs["filters"] = b.getFilters(aggJson)
  63. case "terms":
  64. terms := b.getTerms(aggJson)
  65. esAggs["terms"] = terms.Terms
  66. esAggs["aggs"] = terms.Aggs
  67. case "geohash_grid":
  68. return nil, errors.New("alert not support Geo_Hash_Grid")
  69. }
  70. if _, ok := nestedAggs["aggs"]; !ok {
  71. nestedAggs["aggs"] = make(Aggs)
  72. }
  73. if aggs, ok := (nestedAggs["aggs"]).(Aggs); ok {
  74. aggs[id] = esAggs
  75. }
  76. nestedAggs = esAggs
  77. }
  78. nestedAggs["aggs"] = make(Aggs)
  79. for _, metricRaw := range metrics {
  80. metric := make(Metric)
  81. metricJson := simplejson.NewFromAny(metricRaw)
  82. id, err := metricJson.Get("id").String()
  83. if err != nil {
  84. return nil, err
  85. }
  86. metricType, err := metricJson.Get("type").String()
  87. if err != nil {
  88. return nil, err
  89. }
  90. if metricType == "count" {
  91. continue
  92. }
  93. // todo support pipeline Agg
  94. settings := metricJson.Get("settings").MustMap()
  95. settings["field"] = metricJson.Get("field").MustString()
  96. metric[metricType] = settings
  97. nestedAggs["aggs"].(Aggs)[id] = metric
  98. }
  99. return query, nil
  100. }
  101. func (b *QueryBuilder) getDateHistogramAgg(model *simplejson.Json) DateHistogramAgg {
  102. agg := &DateHistogramAgg{}
  103. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  104. interval, err := settings.Get("interval").String()
  105. if err == nil {
  106. agg.Interval = interval
  107. }
  108. agg.Field = b.TimeField
  109. agg.MinDocCount = settings.Get("min_doc_count").MustInt(0)
  110. agg.ExtendedBounds = ExtendedBounds{"$timeFrom", "$timeTo"}
  111. agg.Format = "epoch_millis"
  112. if agg.Interval == "auto" {
  113. agg.Interval = "$__interval"
  114. }
  115. missing, err := settings.Get("missing").String()
  116. if err == nil {
  117. agg.Missing = missing
  118. }
  119. return *agg
  120. }
  121. func (b *QueryBuilder) getHistogramAgg(model *simplejson.Json) HistogramAgg {
  122. agg := &HistogramAgg{}
  123. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  124. interval, err := settings.Get("interval").String()
  125. if err == nil {
  126. agg.Interval = interval
  127. }
  128. field, err := model.Get("field").String()
  129. if err == nil {
  130. agg.Field = field
  131. }
  132. agg.MinDocCount = settings.Get("min_doc_count").MustInt(0)
  133. missing, err := settings.Get("missing").String()
  134. if err == nil {
  135. agg.Missing = missing
  136. }
  137. return *agg
  138. }
  139. func (b *QueryBuilder) getFilters(model *simplejson.Json) FiltersAgg {
  140. agg := &FiltersAgg{}
  141. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  142. for filter := range settings.Get("filters").MustArray() {
  143. filterJson := simplejson.NewFromAny(filter)
  144. query := filterJson.Get("query").MustString("")
  145. label := filterJson.Get("label").MustString("")
  146. if label == "" {
  147. label = query
  148. }
  149. agg.Filter[label] = newQueryStringFilter(true, query)
  150. }
  151. return *agg
  152. }
  153. func (b *QueryBuilder) getTerms(model *simplejson.Json) TermsAgg {
  154. agg := &TermsAgg{Aggs: make(Aggs)}
  155. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  156. agg.Terms.Field = model.Get("field").MustString()
  157. if settings == nil {
  158. return *agg
  159. }
  160. sizeStr := settings.Get("size").MustString("")
  161. size, err := strconv.Atoi(sizeStr)
  162. if err != nil {
  163. size = 500
  164. }
  165. agg.Terms.Size = size
  166. orderBy := settings.Get("orderBy").MustString("")
  167. if orderBy != "" {
  168. agg.Terms.Order = make(map[string]interface{})
  169. agg.Terms.Order[orderBy] = settings.Get("order").MustString("")
  170. // if orderBy is a int, means this fields is metric result value
  171. // TODO set subAggs
  172. }
  173. minDocCount, err := settings.Get("min_doc_count").Int()
  174. if err == nil {
  175. agg.Terms.MinDocCount = minDocCount
  176. }
  177. missing, err := settings.Get("missing").String()
  178. if err == nil {
  179. agg.Terms.Missing = missing
  180. }
  181. return *agg
  182. }