response_parser.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/grafana/grafana/pkg/components/null"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/tsdb"
  8. "regexp"
  9. "strconv"
  10. "strings"
  11. )
  12. type ElasticsearchResponseParser struct {
  13. Responses []Response
  14. Targets []*Query
  15. }
  16. func (rp *ElasticsearchResponseParser) getTimeSeries() *tsdb.QueryResult {
  17. queryRes := tsdb.NewQueryResult()
  18. for i, res := range rp.Responses {
  19. target := rp.Targets[i]
  20. props := make(map[string]string)
  21. series := make([]*tsdb.TimeSeries, 0)
  22. rp.processBuckets(res.Aggregations, target, &series, props, 0)
  23. rp.nameSeries(&series, target)
  24. queryRes.Series = append(queryRes.Series, series...)
  25. }
  26. return queryRes
  27. }
  28. func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target *Query, series *[]*tsdb.TimeSeries, props map[string]string, depth int) error {
  29. var err error
  30. maxDepth := len(target.BucketAggs) - 1
  31. for aggId, v := range aggs {
  32. aggDef, _ := findAgg(target, aggId)
  33. esAgg := simplejson.NewFromAny(v)
  34. if aggDef == nil {
  35. continue
  36. }
  37. if depth == maxDepth {
  38. if aggDef.Get("type").MustString() == "date_histogram" {
  39. err = rp.processMetrics(esAgg, target, series, props)
  40. if err != nil {
  41. return err
  42. }
  43. } else {
  44. return fmt.Errorf("not support type:%s", aggDef.Get("type").MustString())
  45. }
  46. } else {
  47. for i, b := range esAgg.Get("buckets").MustArray() {
  48. field := aggDef.Get("field").MustString()
  49. bucket := simplejson.NewFromAny(b)
  50. newProps := props
  51. if key, err := bucket.Get("key").String(); err == nil {
  52. newProps[field] = key
  53. } else {
  54. props["filter"] = strconv.Itoa(i)
  55. }
  56. if key, err := bucket.Get("key_as_string").String(); err == nil {
  57. props[field] = key
  58. }
  59. rp.processBuckets(bucket.MustMap(), target, series, newProps, depth+1)
  60. }
  61. }
  62. }
  63. return nil
  64. }
  65. func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *[]*tsdb.TimeSeries, props map[string]string) error {
  66. for _, v := range target.Metrics {
  67. metric := simplejson.NewFromAny(v)
  68. if metric.Get("hide").MustBool(false) {
  69. continue
  70. }
  71. metricId := metric.Get("id").MustString()
  72. metricField := metric.Get("field").MustString()
  73. metricType := metric.Get("type").MustString()
  74. switch metricType {
  75. case "count":
  76. newSeries := tsdb.TimeSeries{}
  77. for _, v := range esAgg.Get("buckets").MustArray() {
  78. bucket := simplejson.NewFromAny(v)
  79. value := castToNullFloat(bucket.Get("doc_count"))
  80. key := castToNullFloat(bucket.Get("key"))
  81. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  82. }
  83. newSeries.Tags = props
  84. newSeries.Tags["metric"] = "count"
  85. *series = append(*series, &newSeries)
  86. case "percentiles":
  87. buckets := esAgg.Get("buckets").MustArray()
  88. if len(buckets) == 0 {
  89. break
  90. }
  91. firstBucket := simplejson.NewFromAny(buckets[0])
  92. percentiles := firstBucket.GetPath(metricId, "values").MustMap()
  93. for percentileName := range percentiles {
  94. newSeries := tsdb.TimeSeries{}
  95. newSeries.Tags = props
  96. newSeries.Tags["metric"] = "p" + percentileName
  97. newSeries.Tags["field"] = metricField
  98. for _, v := range buckets {
  99. bucket := simplejson.NewFromAny(v)
  100. value := castToNullFloat(bucket.GetPath(metricId, "values", percentileName))
  101. key := castToNullFloat(bucket.Get("key"))
  102. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  103. }
  104. *series = append(*series, &newSeries)
  105. }
  106. default:
  107. newSeries := tsdb.TimeSeries{}
  108. newSeries.Tags = props
  109. newSeries.Tags["metric"] = metricType
  110. newSeries.Tags["field"] = metricField
  111. for _, v := range esAgg.Get("buckets").MustArray() {
  112. bucket := simplejson.NewFromAny(v)
  113. key := castToNullFloat(bucket.Get("key"))
  114. valueObj, err := bucket.Get(metricId).Map()
  115. if err != nil {
  116. break
  117. }
  118. var value null.Float
  119. if _, ok := valueObj["normalized_value"]; ok {
  120. value = castToNullFloat(bucket.GetPath(metricId, "normalized_value"))
  121. } else {
  122. value = castToNullFloat(bucket.GetPath(metricId, "value"))
  123. }
  124. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  125. }
  126. *series = append(*series, &newSeries)
  127. }
  128. }
  129. return nil
  130. }
  131. func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries, target *Query) {
  132. set := make(map[string]string)
  133. for _, v := range *seriesList {
  134. if metricType, exists := v.Tags["metric"]; exists {
  135. if _, ok := set[metricType]; !ok {
  136. set[metricType] = ""
  137. }
  138. }
  139. }
  140. metricTypeCount := len(set)
  141. for _, series := range *seriesList {
  142. series.Name = rp.getSeriesName(series, target, metricTypeCount)
  143. }
  144. }
  145. func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
  146. metricType := series.Tags["metric"]
  147. metricName := rp.getMetricName(metricType)
  148. delete(series.Tags, "metric")
  149. field := ""
  150. if v, ok := series.Tags["field"]; ok {
  151. field = v
  152. delete(series.Tags, "field")
  153. }
  154. if target.Alias != "" {
  155. var re = regexp.MustCompile(`{{([\s\S]+?)}}`)
  156. for _, match := range re.FindAllString(target.Alias, -1) {
  157. group := match[2 : len(match)-2]
  158. if strings.HasPrefix(group, "term ") {
  159. if term, ok := series.Tags["term "]; ok {
  160. strings.Replace(target.Alias, match, term, 1)
  161. }
  162. }
  163. if v, ok := series.Tags[group]; ok {
  164. strings.Replace(target.Alias, match, v, 1)
  165. }
  166. switch group {
  167. case "metric":
  168. strings.Replace(target.Alias, match, metricName, 1)
  169. case "field":
  170. strings.Replace(target.Alias, match, field, 1)
  171. }
  172. }
  173. }
  174. // todo, if field and pipelineAgg
  175. if field != "" && isPipelineAgg(metricType) {
  176. found := false
  177. for _, targetMetricI := range target.Metrics {
  178. targetMetric := simplejson.NewFromAny(targetMetricI)
  179. if targetMetric.Get("id").MustString() == field {
  180. metricName += " " + describeMetric(targetMetric.Get("type").MustString(), field)
  181. found = true
  182. }
  183. }
  184. if !found {
  185. metricName = "Unset"
  186. }
  187. } else if field != "" {
  188. metricName += " " + field
  189. }
  190. if len(series.Tags) == 0 {
  191. return metricName
  192. }
  193. name := ""
  194. for _, v := range series.Tags {
  195. name += v + " "
  196. }
  197. if metricTypeCount == 1 {
  198. return strings.TrimSpace(name)
  199. }
  200. return strings.TrimSpace(name) + " " + metricName
  201. }
  202. func (rp *ElasticsearchResponseParser) getMetricName(metric string) string {
  203. if text, ok := metricAggType[metric]; ok {
  204. return text
  205. }
  206. if text, ok := extendedStats[metric]; ok {
  207. return text
  208. }
  209. return metric
  210. }
  211. func castToNullFloat(j *simplejson.Json) null.Float {
  212. f, err := j.Float64()
  213. if err == nil {
  214. return null.FloatFrom(f)
  215. }
  216. s, err := j.String()
  217. if err == nil {
  218. v, _ := strconv.ParseFloat(s, 64)
  219. return null.FloatFromPtr(&v)
  220. }
  221. return null.NewFloat(0, false)
  222. }
  223. func findAgg(target *Query, aggId string) (*simplejson.Json, error) {
  224. for _, v := range target.BucketAggs {
  225. aggDef := simplejson.NewFromAny(v)
  226. if aggId == aggDef.Get("id").MustString() {
  227. return aggDef, nil
  228. }
  229. }
  230. return nil, errors.New("can't found aggDef, aggID:" + aggId)
  231. }