query.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package elasticsearch
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/grafana/grafana/pkg/components/simplejson"
  11. "github.com/grafana/grafana/pkg/models"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "github.com/leibowitz/moment"
  14. )
  15. var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
  16. Lte: "$timeTo",
  17. Format: "epoch_millis"}
  18. type Query struct {
  19. TimeField string `json:"timeField"`
  20. RawQuery string `json:"query"`
  21. BucketAggs []*BucketAgg `json:"bucketAggs"`
  22. Metrics []*Metric `json:"metrics"`
  23. Alias string `json:"alias"`
  24. Interval time.Duration
  25. }
  26. func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (string, error) {
  27. var req Request
  28. req.Size = 0
  29. q.renderReqQuery(&req)
  30. // handle document query
  31. if q.isRawDocumentQuery() {
  32. return "", errors.New("alert not support Raw_Document")
  33. }
  34. err := q.parseAggs(&req)
  35. if err != nil {
  36. return "", err
  37. }
  38. reqBytes, err := json.Marshal(req)
  39. reqHeader := getRequestHeader(queryContext.TimeRange, dsInfo)
  40. payload := bytes.Buffer{}
  41. payload.WriteString(reqHeader.String() + "\n")
  42. payload.WriteString(string(reqBytes) + "\n")
  43. return q.renderTemplate(payload.String(), queryContext)
  44. }
  45. func (q *Query) isRawDocumentQuery() bool {
  46. if len(q.BucketAggs) == 0 {
  47. if len(q.Metrics) > 0 {
  48. metric := simplejson.NewFromAny(q.Metrics[0])
  49. if metric.Get("type").MustString("") == "raw_document" {
  50. return true
  51. }
  52. }
  53. }
  54. return false
  55. }
  56. func (q *Query) renderReqQuery(req *Request) {
  57. req.Query = make(map[string]interface{})
  58. boolQuery := BoolQuery{}
  59. boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(q.TimeField, rangeFilterSetting))
  60. boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, q.RawQuery))
  61. req.Query["bool"] = boolQuery
  62. }
  63. func (q *Query) parseAggs(req *Request) error {
  64. aggs := make(Aggs)
  65. nestedAggs := aggs
  66. for _, agg := range q.BucketAggs {
  67. esAggs := make(Aggs)
  68. switch agg.Type {
  69. case "date_histogram":
  70. esAggs["date_histogram"] = q.getDateHistogramAgg(agg)
  71. case "histogram":
  72. esAggs["histogram"] = q.getHistogramAgg(agg)
  73. case "filters":
  74. esAggs["filters"] = q.getFilters(agg)
  75. case "terms":
  76. terms := q.getTerms(agg)
  77. esAggs["terms"] = terms.Terms
  78. esAggs["aggs"] = terms.Aggs
  79. case "geohash_grid":
  80. return errors.New("alert not support Geo_Hash_Grid")
  81. }
  82. if _, ok := nestedAggs["aggs"]; !ok {
  83. nestedAggs["aggs"] = make(Aggs)
  84. }
  85. if aggs, ok := (nestedAggs["aggs"]).(Aggs); ok {
  86. aggs[agg.ID] = esAggs
  87. }
  88. nestedAggs = esAggs
  89. }
  90. nestedAggs["aggs"] = make(Aggs)
  91. for _, metric := range q.Metrics {
  92. subAgg := make(Aggs)
  93. if metric.Type == "count" {
  94. continue
  95. }
  96. settings := metric.Settings.MustMap(make(map[string]interface{}))
  97. if isPipelineAgg(metric.Type) {
  98. if _, err := strconv.Atoi(metric.PipelineAggregate); err == nil {
  99. settings["buckets_path"] = metric.PipelineAggregate
  100. } else {
  101. continue
  102. }
  103. } else {
  104. settings["field"] = metric.Field
  105. }
  106. subAgg[metric.Type] = settings
  107. nestedAggs["aggs"].(Aggs)[metric.ID] = subAgg
  108. }
  109. req.Aggs = aggs["aggs"].(Aggs)
  110. return nil
  111. }
  112. func (q *Query) getDateHistogramAgg(target *BucketAgg) *DateHistogramAgg {
  113. agg := &DateHistogramAgg{}
  114. interval, err := target.Settings.Get("interval").String()
  115. if err == nil {
  116. agg.Interval = interval
  117. }
  118. agg.Field = q.TimeField
  119. agg.MinDocCount = target.Settings.Get("min_doc_count").MustInt(0)
  120. agg.ExtendedBounds = ExtendedBounds{"$timeFrom", "$timeTo"}
  121. agg.Format = "epoch_millis"
  122. if agg.Interval == "auto" {
  123. agg.Interval = "$__interval"
  124. }
  125. missing, err := target.Settings.Get("missing").String()
  126. if err == nil {
  127. agg.Missing = missing
  128. }
  129. return agg
  130. }
  131. func (q *Query) getHistogramAgg(target *BucketAgg) *HistogramAgg {
  132. agg := &HistogramAgg{}
  133. interval, err := target.Settings.Get("interval").String()
  134. if err == nil {
  135. agg.Interval = interval
  136. }
  137. if target.Field != "" {
  138. agg.Field = target.Field
  139. }
  140. agg.MinDocCount = target.Settings.Get("min_doc_count").MustInt(0)
  141. missing, err := target.Settings.Get("missing").String()
  142. if err == nil {
  143. agg.Missing = missing
  144. }
  145. return agg
  146. }
  147. func (q *Query) getFilters(target *BucketAgg) *FiltersAgg {
  148. agg := &FiltersAgg{}
  149. agg.Filters = map[string]interface{}{}
  150. for _, filter := range target.Settings.Get("filters").MustArray() {
  151. filterJson := simplejson.NewFromAny(filter)
  152. query := filterJson.Get("query").MustString("")
  153. label := filterJson.Get("label").MustString("")
  154. if label == "" {
  155. label = query
  156. }
  157. agg.Filters[label] = newQueryStringFilter(true, query)
  158. }
  159. return agg
  160. }
  161. func (q *Query) getTerms(target *BucketAgg) *TermsAggWrap {
  162. agg := &TermsAggWrap{Aggs: make(Aggs)}
  163. agg.Terms.Field = target.Field
  164. if len(target.Settings.MustMap()) == 0 {
  165. return agg
  166. }
  167. sizeStr := target.Settings.Get("size").MustString("")
  168. size, err := strconv.Atoi(sizeStr)
  169. if err != nil {
  170. size = 500
  171. }
  172. agg.Terms.Size = size
  173. orderBy, err := target.Settings.Get("orderBy").String()
  174. if err == nil {
  175. agg.Terms.Order = make(map[string]interface{})
  176. agg.Terms.Order[orderBy] = target.Settings.Get("order").MustString("")
  177. if _, err := strconv.Atoi(orderBy); err != nil {
  178. for _, metricI := range q.Metrics {
  179. metric := simplejson.NewFromAny(metricI)
  180. metricId := metric.Get("id").MustString()
  181. if metricId == orderBy {
  182. subAggs := make(Aggs)
  183. metricField := metric.Get("field").MustString()
  184. metricType := metric.Get("type").MustString()
  185. subAggs[metricType] = map[string]string{"field": metricField}
  186. agg.Aggs = make(Aggs)
  187. agg.Aggs[metricId] = subAggs
  188. break
  189. }
  190. }
  191. }
  192. }
  193. missing, err := target.Settings.Get("missing").String()
  194. if err == nil {
  195. agg.Terms.Missing = missing
  196. }
  197. return agg
  198. }
  199. func (q *Query) renderTemplate(payload string, queryContext *tsdb.TsdbQuery) (string, error) {
  200. timeRange := queryContext.TimeRange
  201. interval := intervalCalculator.Calculate(timeRange, q.Interval)
  202. payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", timeRange.GetFromAsMsEpoch()), -1)
  203. payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", timeRange.GetToAsMsEpoch()), -1)
  204. payload = strings.Replace(payload, "$interval", interval.Text, -1)
  205. payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
  206. payload = strings.Replace(payload, "$__interval", interval.Text, -1)
  207. return payload, nil
  208. }
  209. func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader {
  210. var header QueryHeader
  211. esVersion := dsInfo.JsonData.Get("esVersion").MustInt()
  212. searchType := "query_then_fetch"
  213. if esVersion < 5 {
  214. searchType = "count"
  215. }
  216. header.SearchType = searchType
  217. header.IgnoreUnavailable = true
  218. header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(), timeRange)
  219. if esVersion >= 56 {
  220. header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
  221. }
  222. return &header
  223. }
  224. func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string {
  225. if interval == "" {
  226. return pattern
  227. }
  228. var indexes []string
  229. indexParts := strings.Split(strings.TrimLeft(pattern, "["), "]")
  230. indexBase := indexParts[0]
  231. if len(indexParts) <= 1 {
  232. return pattern
  233. }
  234. indexDateFormat := indexParts[1]
  235. start := moment.NewMoment(timeRange.MustGetFrom())
  236. end := moment.NewMoment(timeRange.MustGetTo())
  237. indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
  238. for start.IsBefore(*end) {
  239. switch interval {
  240. case "Hourly":
  241. start = start.AddHours(1)
  242. case "Daily":
  243. start = start.AddDay()
  244. case "Weekly":
  245. start = start.AddWeeks(1)
  246. case "Monthly":
  247. start = start.AddMonths(1)
  248. case "Yearly":
  249. start = start.AddYears(1)
  250. }
  251. indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
  252. }
  253. return strings.Join(indexes, ",")
  254. }