query.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package elasticsearch
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/tsdb"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
  15. Lte: "$timeTo",
  16. Format: "epoch_millis"}
  17. type Query struct {
  18. TimeField string `json:"timeField"`
  19. RawQuery string `json:"query"`
  20. BucketAggs []interface{} `json:"bucketAggs"`
  21. Metrics []interface{} `json:"metrics"`
  22. Alias string `json:"Alias"`
  23. Interval time.Duration
  24. }
  25. func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (string, error) {
  26. var req Request
  27. payload := bytes.Buffer{}
  28. req.Size = 0
  29. q.renderReqQuery(&req)
  30. // handle document query
  31. if q.isRawDocumentQuery() {
  32. return "", errors.New("alert not support Raw_Document")
  33. }
  34. err := q.parseAggs(&req)
  35. if err != nil {
  36. return "", err
  37. }
  38. reqBytes, err := json.Marshal(req)
  39. reqHeader := getRequestHeader(queryContext.TimeRange, dsInfo)
  40. payload.WriteString(reqHeader.String() + "\n")
  41. payload.WriteString(string(reqBytes) + "\n")
  42. return q.renderTemplate(payload.String(), queryContext)
  43. }
  44. func (q *Query) isRawDocumentQuery() bool {
  45. if len(q.BucketAggs) == 0 {
  46. if len(q.Metrics) > 0 {
  47. metric := simplejson.NewFromAny(q.Metrics[0])
  48. if metric.Get("type").MustString("") == "raw_document" {
  49. return true
  50. }
  51. }
  52. }
  53. return false
  54. }
  55. func (q *Query) renderReqQuery(req *Request) {
  56. req.Query = make(map[string]interface{})
  57. boolQuery := BoolQuery{}
  58. boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(q.TimeField, rangeFilterSetting))
  59. boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, q.RawQuery))
  60. req.Query["bool"] = boolQuery
  61. }
  62. func (q *Query) parseAggs(req *Request) error {
  63. aggs := make(Aggs)
  64. nestedAggs := aggs
  65. for _, aggRaw := range q.BucketAggs {
  66. esAggs := make(Aggs)
  67. aggJson := simplejson.NewFromAny(aggRaw)
  68. aggType, err := aggJson.Get("type").String()
  69. if err != nil {
  70. return err
  71. }
  72. id, err := aggJson.Get("id").String()
  73. if err != nil {
  74. return err
  75. }
  76. switch aggType {
  77. case "date_histogram":
  78. esAggs["date_histogram"] = q.getDateHistogramAgg(aggJson)
  79. case "histogram":
  80. esAggs["histogram"] = q.getHistogramAgg(aggJson)
  81. case "filters":
  82. esAggs["filters"] = q.getFilters(aggJson)
  83. case "terms":
  84. terms := q.getTerms(aggJson)
  85. esAggs["terms"] = terms.Terms
  86. esAggs["aggs"] = terms.Aggs
  87. case "geohash_grid":
  88. return errors.New("alert not support Geo_Hash_Grid")
  89. }
  90. if _, ok := nestedAggs["aggs"]; !ok {
  91. nestedAggs["aggs"] = make(Aggs)
  92. }
  93. if aggs, ok := (nestedAggs["aggs"]).(Aggs); ok {
  94. aggs[id] = esAggs
  95. }
  96. nestedAggs = esAggs
  97. }
  98. nestedAggs["aggs"] = make(Aggs)
  99. for _, metricRaw := range q.Metrics {
  100. metric := make(Metric)
  101. metricJson := simplejson.NewFromAny(metricRaw)
  102. id, err := metricJson.Get("id").String()
  103. if err != nil {
  104. return err
  105. }
  106. metricType, err := metricJson.Get("type").String()
  107. if err != nil {
  108. return err
  109. }
  110. if metricType == "count" {
  111. continue
  112. }
  113. settings := metricJson.Get("settings").MustMap(map[string]interface{}{})
  114. if isPipelineAgg(metricType) {
  115. pipelineAgg := metricJson.Get("pipelineAgg").MustString("")
  116. if _, err := strconv.Atoi(pipelineAgg); err == nil {
  117. settings["buckets_path"] = pipelineAgg
  118. } else {
  119. continue
  120. }
  121. } else {
  122. settings["field"] = metricJson.Get("field").MustString()
  123. }
  124. metric[metricType] = settings
  125. nestedAggs["aggs"].(Aggs)[id] = metric
  126. }
  127. req.Aggs = aggs["aggs"].(Aggs)
  128. return nil
  129. }
  130. func (q *Query) getDateHistogramAgg(model *simplejson.Json) *DateHistogramAgg {
  131. agg := &DateHistogramAgg{}
  132. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  133. interval, err := settings.Get("interval").String()
  134. if err == nil {
  135. agg.Interval = interval
  136. }
  137. agg.Field = q.TimeField
  138. agg.MinDocCount = settings.Get("min_doc_count").MustInt(0)
  139. agg.ExtendedBounds = ExtendedBounds{"$timeFrom", "$timeTo"}
  140. agg.Format = "epoch_millis"
  141. if agg.Interval == "auto" {
  142. agg.Interval = "$__interval"
  143. }
  144. missing, err := settings.Get("missing").String()
  145. if err == nil {
  146. agg.Missing = missing
  147. }
  148. return agg
  149. }
  150. func (q *Query) getHistogramAgg(model *simplejson.Json) *HistogramAgg {
  151. agg := &HistogramAgg{}
  152. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  153. interval, err := settings.Get("interval").String()
  154. if err == nil {
  155. agg.Interval = interval
  156. }
  157. field, err := model.Get("field").String()
  158. if err == nil {
  159. agg.Field = field
  160. }
  161. agg.MinDocCount = settings.Get("min_doc_count").MustInt(0)
  162. missing, err := settings.Get("missing").String()
  163. if err == nil {
  164. agg.Missing = missing
  165. }
  166. return agg
  167. }
  168. func (q *Query) getFilters(model *simplejson.Json) *FiltersAgg {
  169. agg := &FiltersAgg{}
  170. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  171. for filter := range settings.Get("filters").MustArray() {
  172. filterJson := simplejson.NewFromAny(filter)
  173. query := filterJson.Get("query").MustString("")
  174. label := filterJson.Get("label").MustString("")
  175. if label == "" {
  176. label = query
  177. }
  178. agg.Filter[label] = newQueryStringFilter(true, query)
  179. }
  180. return agg
  181. }
  182. func (q *Query) getTerms(model *simplejson.Json) *TermsAgg {
  183. agg := &TermsAgg{Aggs: make(Aggs)}
  184. settings := simplejson.NewFromAny(model.Get("settings").Interface())
  185. agg.Terms.Field = model.Get("field").MustString()
  186. if settings == nil {
  187. return agg
  188. }
  189. sizeStr := settings.Get("size").MustString("")
  190. size, err := strconv.Atoi(sizeStr)
  191. if err != nil {
  192. size = 500
  193. }
  194. agg.Terms.Size = size
  195. orderBy, err := settings.Get("orderBy").String()
  196. if err == nil {
  197. agg.Terms.Order = make(map[string]interface{})
  198. agg.Terms.Order[orderBy] = settings.Get("order").MustString("")
  199. if _, err := strconv.Atoi(orderBy); err != nil {
  200. for _, metricI := range q.Metrics {
  201. metric := simplejson.NewFromAny(metricI)
  202. metricId := metric.Get("id").MustString()
  203. if metricId == orderBy {
  204. subAggs := make(Aggs)
  205. metricField := metric.Get("field").MustString()
  206. metricType := metric.Get("type").MustString()
  207. subAggs[metricType] = map[string]string{"field": metricField}
  208. agg.Aggs = make(Aggs)
  209. agg.Aggs[metricId] = subAggs
  210. break
  211. }
  212. }
  213. }
  214. }
  215. missing, err := settings.Get("missing").String()
  216. if err == nil {
  217. agg.Terms.Missing = missing
  218. }
  219. return agg
  220. }
  221. func (q *Query) renderTemplate(payload string, queryContext *tsdb.TsdbQuery) (string, error) {
  222. timeRange := queryContext.TimeRange
  223. interval := intervalCalculator.Calculate(timeRange, q.Interval)
  224. payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", timeRange.GetFromAsMsEpoch()), -1)
  225. payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", timeRange.GetToAsMsEpoch()), -1)
  226. payload = strings.Replace(payload, "$interval", interval.Text, -1)
  227. payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
  228. payload = strings.Replace(payload, "$__interval", interval.Text, -1)
  229. return payload, nil
  230. }