response_parser.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/grafana/grafana/pkg/components/null"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/tsdb"
  8. "strconv"
  9. )
  10. type ElasticsearchResponseParser struct {
  11. Responses []Response
  12. Targets []QueryBuilder
  13. }
  14. func (rp *ElasticsearchResponseParser) getTimeSeries() []interface{} {
  15. for i, res := range rp.Responses {
  16. var series []interface{}
  17. target := rp.Targets[i]
  18. props := make(map[string]interface{})
  19. rp.processBuckets(res.Aggregations, target, &series, props, 0)
  20. }
  21. }
  22. func findAgg(target QueryBuilder, aggId string) (*simplejson.Json, error) {
  23. for _, v := range target.BucketAggs {
  24. aggDef := simplejson.NewFromAny(v)
  25. if aggId == aggDef.Get("id").MustString() {
  26. return aggDef, nil
  27. }
  28. }
  29. return nil, errors.New("can't found aggDef, aggID:" + aggId)
  30. }
  31. func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target QueryBuilder, series *[]interface{}, props map[string]interface{}, depth int) error {
  32. maxDepth := len(target.BucketAggs) - 1
  33. for aggId, v := range aggs {
  34. aggDef, _ := findAgg(target, aggId)
  35. esAgg := simplejson.NewFromAny(v)
  36. if aggDef == nil {
  37. continue
  38. }
  39. if depth == maxDepth {
  40. if aggDef.Get("type").MustString() == "date_histogram" {
  41. rp.processMetrics(esAgg, target, series, props)
  42. }
  43. }
  44. }
  45. }
  46. func mapCopy(originalMap, newMap *map[string]string) {
  47. for k, v := range originalMap {
  48. newMap[k] = v
  49. }
  50. }
  51. func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target QueryBuilder, props map[string]string) ([]*tsdb.TimeSeries, error) {
  52. var series []*tsdb.TimeSeries
  53. for _, v := range target.Metrics {
  54. metric := simplejson.NewFromAny(v)
  55. if metric.Get("hide").MustBool(false) {
  56. continue
  57. }
  58. metricId := fmt.Sprintf("%d", metric.Get("id").MustInt())
  59. metricField := metric.Get("field").MustString()
  60. switch metric.Get("type").MustString() {
  61. case "count":
  62. newSeries := tsdb.TimeSeries{}
  63. for _, v := range esAgg.Get("buckets").MustMap() {
  64. bucket := simplejson.NewFromAny(v)
  65. value := bucket.Get("doc_count").MustFloat64()
  66. key := bucket.Get("key").MustFloat64()
  67. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{null.FloatFromPtr(&value), null.FloatFromPtr(&key)})
  68. }
  69. newSeries.Tags = props
  70. newSeries.Tags["metric"] = "count"
  71. series = append(series, &newSeries)
  72. case "percentiles":
  73. buckets := esAgg.Get("buckets").MustArray()
  74. if len(buckets) == 0 {
  75. break
  76. }
  77. firstBucket := simplejson.NewFromAny(buckets[0])
  78. percentiles := firstBucket.GetPath(metricId, "values").MustMap()
  79. for percentileName := range percentiles {
  80. newSeries := tsdb.TimeSeries{}
  81. newSeries.Tags = props
  82. newSeries.Tags["metric"] = "p" + percentileName
  83. newSeries.Tags["field"] = metricField
  84. for _, v := range buckets {
  85. bucket := simplejson.NewFromAny(v)
  86. valueStr := bucket.GetPath(metricId, "values", percentileName).MustString()
  87. value, _ := strconv.ParseFloat(valueStr, 64)
  88. key := bucket.Get("key").MustFloat64()
  89. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{null.FloatFromPtr(&value), null.FloatFromPtr(&key)})
  90. }
  91. series = append(series, &newSeries)
  92. }
  93. }
  94. }
  95. return series
  96. }