response_parser.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "regexp"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/grafana/grafana/pkg/components/null"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  12. )
  13. const (
  14. // Metric types
  15. countType = "count"
  16. percentilesType = "percentiles"
  17. extendedStatsType = "extended_stats"
  18. // Bucket types
  19. dateHistType = "date_histogram"
  20. histogramType = "histogram"
  21. filtersType = "filters"
  22. termsType = "terms"
  23. geohashGridType = "geohash_grid"
  24. )
  25. type responseParser struct {
  26. Responses []*es.SearchResponse
  27. Targets []*Query
  28. }
  29. var newResponseParser = func(responses []*es.SearchResponse, targets []*Query) *responseParser {
  30. return &responseParser{
  31. Responses: responses,
  32. Targets: targets,
  33. }
  34. }
  35. func (rp *responseParser) getTimeSeries() (*tsdb.Response, error) {
  36. result := &tsdb.Response{}
  37. result.Results = make(map[string]*tsdb.QueryResult)
  38. if rp.Responses == nil {
  39. return result, nil
  40. }
  41. for i, res := range rp.Responses {
  42. target := rp.Targets[i]
  43. if res.Error != nil {
  44. result.Results[target.RefID] = getErrorFromElasticResponse(res)
  45. continue
  46. }
  47. queryRes := tsdb.NewQueryResult()
  48. props := make(map[string]string)
  49. table := tsdb.Table{
  50. Columns: make([]tsdb.TableColumn, 0),
  51. Rows: make([]tsdb.RowValues, 0),
  52. }
  53. err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
  54. if err != nil {
  55. return nil, err
  56. }
  57. rp.nameSeries(&queryRes.Series, target)
  58. rp.trimDatapoints(&queryRes.Series, target)
  59. if len(table.Rows) > 0 {
  60. queryRes.Tables = append(queryRes.Tables, &table)
  61. }
  62. result.Results[target.RefID] = queryRes
  63. }
  64. return result, nil
  65. }
  66. func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, series *tsdb.TimeSeriesSlice, table *tsdb.Table, props map[string]string, depth int) error {
  67. var err error
  68. maxDepth := len(target.BucketAggs) - 1
  69. aggIDs := make([]string, 0)
  70. for k := range aggs {
  71. aggIDs = append(aggIDs, k)
  72. }
  73. sort.Strings(aggIDs)
  74. for _, aggID := range aggIDs {
  75. v := aggs[aggID]
  76. aggDef, _ := findAgg(target, aggID)
  77. esAgg := simplejson.NewFromAny(v)
  78. if aggDef == nil {
  79. continue
  80. }
  81. if depth == maxDepth {
  82. if aggDef.Type == dateHistType {
  83. err = rp.processMetrics(esAgg, target, series, props)
  84. } else {
  85. err = rp.processAggregationDocs(esAgg, aggDef, target, table, props)
  86. }
  87. if err != nil {
  88. return err
  89. }
  90. } else {
  91. for _, b := range esAgg.Get("buckets").MustArray() {
  92. bucket := simplejson.NewFromAny(b)
  93. newProps := make(map[string]string)
  94. for k, v := range props {
  95. newProps[k] = v
  96. }
  97. if key, err := bucket.Get("key").String(); err == nil {
  98. newProps[aggDef.Field] = key
  99. } else if key, err := bucket.Get("key").Int64(); err == nil {
  100. newProps[aggDef.Field] = strconv.FormatInt(key, 10)
  101. }
  102. if key, err := bucket.Get("key_as_string").String(); err == nil {
  103. newProps[aggDef.Field] = key
  104. }
  105. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  106. if err != nil {
  107. return err
  108. }
  109. }
  110. buckets := esAgg.Get("buckets").MustMap()
  111. bucketKeys := make([]string, 0)
  112. for k := range buckets {
  113. bucketKeys = append(bucketKeys, k)
  114. }
  115. sort.Strings(bucketKeys)
  116. for _, bucketKey := range bucketKeys {
  117. bucket := simplejson.NewFromAny(buckets[bucketKey])
  118. newProps := make(map[string]string)
  119. for k, v := range props {
  120. newProps[k] = v
  121. }
  122. newProps["filter"] = bucketKey
  123. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  124. if err != nil {
  125. return err
  126. }
  127. }
  128. }
  129. }
  130. return nil
  131. }
  132. func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *tsdb.TimeSeriesSlice, props map[string]string) error {
  133. for _, metric := range target.Metrics {
  134. if metric.Hide {
  135. continue
  136. }
  137. switch metric.Type {
  138. case countType:
  139. newSeries := tsdb.TimeSeries{
  140. Tags: make(map[string]string),
  141. }
  142. for _, v := range esAgg.Get("buckets").MustArray() {
  143. bucket := simplejson.NewFromAny(v)
  144. value := castToNullFloat(bucket.Get("doc_count"))
  145. key := castToNullFloat(bucket.Get("key"))
  146. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  147. }
  148. for k, v := range props {
  149. newSeries.Tags[k] = v
  150. }
  151. newSeries.Tags["metric"] = countType
  152. *series = append(*series, &newSeries)
  153. case percentilesType:
  154. buckets := esAgg.Get("buckets").MustArray()
  155. if len(buckets) == 0 {
  156. break
  157. }
  158. firstBucket := simplejson.NewFromAny(buckets[0])
  159. percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
  160. percentileKeys := make([]string, 0)
  161. for k := range percentiles {
  162. percentileKeys = append(percentileKeys, k)
  163. }
  164. sort.Strings(percentileKeys)
  165. for _, percentileName := range percentileKeys {
  166. newSeries := tsdb.TimeSeries{
  167. Tags: make(map[string]string),
  168. }
  169. for k, v := range props {
  170. newSeries.Tags[k] = v
  171. }
  172. newSeries.Tags["metric"] = "p" + percentileName
  173. newSeries.Tags["field"] = metric.Field
  174. for _, v := range buckets {
  175. bucket := simplejson.NewFromAny(v)
  176. value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName))
  177. key := castToNullFloat(bucket.Get("key"))
  178. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  179. }
  180. *series = append(*series, &newSeries)
  181. }
  182. case extendedStatsType:
  183. buckets := esAgg.Get("buckets").MustArray()
  184. metaKeys := make([]string, 0)
  185. meta := metric.Meta.MustMap()
  186. for k := range meta {
  187. metaKeys = append(metaKeys, k)
  188. }
  189. sort.Strings(metaKeys)
  190. for _, statName := range metaKeys {
  191. v := meta[statName]
  192. if enabled, ok := v.(bool); !ok || !enabled {
  193. continue
  194. }
  195. newSeries := tsdb.TimeSeries{
  196. Tags: make(map[string]string),
  197. }
  198. for k, v := range props {
  199. newSeries.Tags[k] = v
  200. }
  201. newSeries.Tags["metric"] = statName
  202. newSeries.Tags["field"] = metric.Field
  203. for _, v := range buckets {
  204. bucket := simplejson.NewFromAny(v)
  205. key := castToNullFloat(bucket.Get("key"))
  206. var value null.Float
  207. if statName == "std_deviation_bounds_upper" {
  208. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  209. } else if statName == "std_deviation_bounds_lower" {
  210. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  211. } else {
  212. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  213. }
  214. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  215. }
  216. *series = append(*series, &newSeries)
  217. }
  218. default:
  219. newSeries := tsdb.TimeSeries{
  220. Tags: make(map[string]string),
  221. }
  222. for k, v := range props {
  223. newSeries.Tags[k] = v
  224. }
  225. newSeries.Tags["metric"] = metric.Type
  226. newSeries.Tags["field"] = metric.Field
  227. for _, v := range esAgg.Get("buckets").MustArray() {
  228. bucket := simplejson.NewFromAny(v)
  229. key := castToNullFloat(bucket.Get("key"))
  230. valueObj, err := bucket.Get(metric.ID).Map()
  231. if err != nil {
  232. continue
  233. }
  234. var value null.Float
  235. if _, ok := valueObj["normalized_value"]; ok {
  236. value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value"))
  237. } else {
  238. value = castToNullFloat(bucket.GetPath(metric.ID, "value"))
  239. }
  240. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  241. }
  242. *series = append(*series, &newSeries)
  243. }
  244. }
  245. return nil
  246. }
  247. func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, table *tsdb.Table, props map[string]string) error {
  248. propKeys := make([]string, 0)
  249. for k := range props {
  250. propKeys = append(propKeys, k)
  251. }
  252. sort.Strings(propKeys)
  253. if len(table.Columns) == 0 {
  254. for _, propKey := range propKeys {
  255. table.Columns = append(table.Columns, tsdb.TableColumn{Text: propKey})
  256. }
  257. table.Columns = append(table.Columns, tsdb.TableColumn{Text: aggDef.Field})
  258. }
  259. addMetricValue := func(values *tsdb.RowValues, metricName string, value null.Float) {
  260. found := false
  261. for _, c := range table.Columns {
  262. if c.Text == metricName {
  263. found = true
  264. break
  265. }
  266. }
  267. if !found {
  268. table.Columns = append(table.Columns, tsdb.TableColumn{Text: metricName})
  269. }
  270. *values = append(*values, value)
  271. }
  272. for _, v := range esAgg.Get("buckets").MustArray() {
  273. bucket := simplejson.NewFromAny(v)
  274. values := make(tsdb.RowValues, 0)
  275. for _, propKey := range propKeys {
  276. values = append(values, props[propKey])
  277. }
  278. if key, err := bucket.Get("key").String(); err == nil {
  279. values = append(values, key)
  280. } else {
  281. values = append(values, castToNullFloat(bucket.Get("key")))
  282. }
  283. for _, metric := range target.Metrics {
  284. switch metric.Type {
  285. case countType:
  286. addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count")))
  287. case extendedStatsType:
  288. metaKeys := make([]string, 0)
  289. meta := metric.Meta.MustMap()
  290. for k := range meta {
  291. metaKeys = append(metaKeys, k)
  292. }
  293. sort.Strings(metaKeys)
  294. for _, statName := range metaKeys {
  295. v := meta[statName]
  296. if enabled, ok := v.(bool); !ok || !enabled {
  297. continue
  298. }
  299. var value null.Float
  300. if statName == "std_deviation_bounds_upper" {
  301. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  302. } else if statName == "std_deviation_bounds_lower" {
  303. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  304. } else {
  305. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  306. }
  307. addMetricValue(&values, rp.getMetricName(metric.Type), value)
  308. break
  309. }
  310. default:
  311. metricName := rp.getMetricName(metric.Type)
  312. otherMetrics := make([]*MetricAgg, 0)
  313. for _, m := range target.Metrics {
  314. if m.Type == metric.Type {
  315. otherMetrics = append(otherMetrics, m)
  316. }
  317. }
  318. if len(otherMetrics) > 1 {
  319. metricName += " " + metric.Field
  320. }
  321. addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value")))
  322. }
  323. }
  324. table.Rows = append(table.Rows, values)
  325. }
  326. return nil
  327. }
  328. func (rp *responseParser) trimDatapoints(series *tsdb.TimeSeriesSlice, target *Query) {
  329. var histogram *BucketAgg
  330. for _, bucketAgg := range target.BucketAggs {
  331. if bucketAgg.Type == dateHistType {
  332. histogram = bucketAgg
  333. break
  334. }
  335. }
  336. if histogram == nil {
  337. return
  338. }
  339. trimEdges, err := histogram.Settings.Get("trimEdges").Int()
  340. if err != nil {
  341. return
  342. }
  343. for _, s := range *series {
  344. if len(s.Points) > trimEdges*2 {
  345. s.Points = s.Points[trimEdges : len(s.Points)-trimEdges]
  346. }
  347. }
  348. }
  349. func (rp *responseParser) nameSeries(seriesList *tsdb.TimeSeriesSlice, target *Query) {
  350. set := make(map[string]string)
  351. for _, v := range *seriesList {
  352. if metricType, exists := v.Tags["metric"]; exists {
  353. if _, ok := set[metricType]; !ok {
  354. set[metricType] = ""
  355. }
  356. }
  357. }
  358. metricTypeCount := len(set)
  359. for _, series := range *seriesList {
  360. series.Name = rp.getSeriesName(series, target, metricTypeCount)
  361. }
  362. }
  363. var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
  364. func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
  365. metricType := series.Tags["metric"]
  366. metricName := rp.getMetricName(metricType)
  367. delete(series.Tags, "metric")
  368. field := ""
  369. if v, ok := series.Tags["field"]; ok {
  370. field = v
  371. delete(series.Tags, "field")
  372. }
  373. if target.Alias != "" {
  374. seriesName := target.Alias
  375. subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
  376. for _, subMatch := range subMatches {
  377. group := subMatch[0]
  378. if len(subMatch) > 1 {
  379. group = subMatch[1]
  380. }
  381. if strings.Index(group, "term ") == 0 {
  382. seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1)
  383. }
  384. if v, ok := series.Tags[group]; ok {
  385. seriesName = strings.Replace(seriesName, subMatch[0], v, 1)
  386. }
  387. if group == "metric" {
  388. seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1)
  389. }
  390. if group == "field" {
  391. seriesName = strings.Replace(seriesName, subMatch[0], field, 1)
  392. }
  393. }
  394. return seriesName
  395. }
  396. // todo, if field and pipelineAgg
  397. if field != "" && isPipelineAgg(metricType) {
  398. found := false
  399. for _, metric := range target.Metrics {
  400. if metric.ID == field {
  401. metricName += " " + describeMetric(metric.Type, field)
  402. found = true
  403. }
  404. }
  405. if !found {
  406. metricName = "Unset"
  407. }
  408. } else if field != "" {
  409. metricName += " " + field
  410. }
  411. if len(series.Tags) == 0 {
  412. return metricName
  413. }
  414. name := ""
  415. for _, v := range series.Tags {
  416. name += v + " "
  417. }
  418. if metricTypeCount == 1 {
  419. return strings.TrimSpace(name)
  420. }
  421. return strings.TrimSpace(name) + " " + metricName
  422. }
  423. func (rp *responseParser) getMetricName(metric string) string {
  424. if text, ok := metricAggType[metric]; ok {
  425. return text
  426. }
  427. if text, ok := extendedStats[metric]; ok {
  428. return text
  429. }
  430. return metric
  431. }
  432. func castToNullFloat(j *simplejson.Json) null.Float {
  433. f, err := j.Float64()
  434. if err == nil {
  435. return null.FloatFrom(f)
  436. }
  437. if s, err := j.String(); err == nil {
  438. if strings.ToLower(s) == "nan" {
  439. return null.NewFloat(0, false)
  440. }
  441. if v, err := strconv.ParseFloat(s, 64); err == nil {
  442. return null.FloatFromPtr(&v)
  443. }
  444. }
  445. return null.NewFloat(0, false)
  446. }
  447. func findAgg(target *Query, aggID string) (*BucketAgg, error) {
  448. for _, v := range target.BucketAggs {
  449. if aggID == v.ID {
  450. return v, nil
  451. }
  452. }
  453. return nil, errors.New("can't found aggDef, aggID:" + aggID)
  454. }
  455. func getErrorFromElasticResponse(response *es.SearchResponse) *tsdb.QueryResult {
  456. result := tsdb.NewQueryResult()
  457. json := simplejson.NewFromAny(response.Error)
  458. reason := json.Get("reason").MustString()
  459. rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
  460. if rootCauseReason != "" {
  461. result.ErrorString = rootCauseReason
  462. } else if reason != "" {
  463. result.ErrorString = reason
  464. } else {
  465. result.ErrorString = "Unkown elasticsearch error response"
  466. }
  467. return result
  468. }