response_parser.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "regexp"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/grafana/grafana/pkg/components/null"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  12. )
  13. const (
  14. // Metric types
  15. countType = "count"
  16. percentilesType = "percentiles"
  17. extendedStatsType = "extended_stats"
  18. // Bucket types
  19. dateHistType = "date_histogram"
  20. histogramType = "histogram"
  21. filtersType = "filters"
  22. termsType = "terms"
  23. geohashGridType = "geohash_grid"
  24. )
  25. type responseParser struct {
  26. Responses []*es.SearchResponse
  27. Targets []*Query
  28. }
  29. var newResponseParser = func(responses []*es.SearchResponse, targets []*Query) *responseParser {
  30. return &responseParser{
  31. Responses: responses,
  32. Targets: targets,
  33. }
  34. }
  35. func (rp *responseParser) getTimeSeries() (*tsdb.Response, error) {
  36. result := &tsdb.Response{}
  37. result.Results = make(map[string]*tsdb.QueryResult)
  38. if rp.Responses == nil {
  39. return result, nil
  40. }
  41. for i, res := range rp.Responses {
  42. target := rp.Targets[i]
  43. if res.Error != nil {
  44. result.Results[target.RefID] = getErrorFromElasticResponse(res)
  45. continue
  46. }
  47. queryRes := tsdb.NewQueryResult()
  48. props := make(map[string]string)
  49. table := tsdb.Table{
  50. Columns: make([]tsdb.TableColumn, 0),
  51. Rows: make([]tsdb.RowValues, 0),
  52. }
  53. err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
  54. if err != nil {
  55. return nil, err
  56. }
  57. rp.nameSeries(&queryRes.Series, target)
  58. rp.trimDatapoints(&queryRes.Series, target)
  59. if len(table.Rows) > 0 {
  60. queryRes.Tables = append(queryRes.Tables, &table)
  61. }
  62. result.Results[target.RefID] = queryRes
  63. }
  64. return result, nil
  65. }
  66. func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, series *tsdb.TimeSeriesSlice, table *tsdb.Table, props map[string]string, depth int) error {
  67. var err error
  68. maxDepth := len(target.BucketAggs) - 1
  69. aggIDs := make([]string, 0)
  70. for k := range aggs {
  71. aggIDs = append(aggIDs, k)
  72. }
  73. sort.Strings(aggIDs)
  74. for _, aggID := range aggIDs {
  75. v := aggs[aggID]
  76. aggDef, _ := findAgg(target, aggID)
  77. esAgg := simplejson.NewFromAny(v)
  78. if aggDef == nil {
  79. continue
  80. }
  81. if depth == maxDepth {
  82. if aggDef.Type == dateHistType {
  83. err = rp.processMetrics(esAgg, target, series, props)
  84. } else {
  85. err = rp.processAggregationDocs(esAgg, aggDef, target, table, props)
  86. }
  87. if err != nil {
  88. return err
  89. }
  90. } else {
  91. for _, b := range esAgg.Get("buckets").MustArray() {
  92. bucket := simplejson.NewFromAny(b)
  93. newProps := make(map[string]string)
  94. for k, v := range props {
  95. newProps[k] = v
  96. }
  97. if key, err := bucket.Get("key").String(); err == nil {
  98. newProps[aggDef.Field] = key
  99. } else if key, err := bucket.Get("key").Int64(); err == nil {
  100. newProps[aggDef.Field] = strconv.FormatInt(key, 10)
  101. }
  102. if key, err := bucket.Get("key_as_string").String(); err == nil {
  103. newProps[aggDef.Field] = key
  104. }
  105. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  106. if err != nil {
  107. return err
  108. }
  109. }
  110. buckets := esAgg.Get("buckets").MustMap()
  111. bucketKeys := make([]string, 0)
  112. for k := range buckets {
  113. bucketKeys = append(bucketKeys, k)
  114. }
  115. sort.Strings(bucketKeys)
  116. for _, bucketKey := range bucketKeys {
  117. bucket := simplejson.NewFromAny(buckets[bucketKey])
  118. newProps := make(map[string]string)
  119. for k, v := range props {
  120. newProps[k] = v
  121. }
  122. newProps["filter"] = bucketKey
  123. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  124. if err != nil {
  125. return err
  126. }
  127. }
  128. }
  129. }
  130. return nil
  131. }
  132. func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *tsdb.TimeSeriesSlice, props map[string]string) error {
  133. for _, metric := range target.Metrics {
  134. if metric.Hide {
  135. continue
  136. }
  137. switch metric.Type {
  138. case countType:
  139. newSeries := tsdb.TimeSeries{
  140. Tags: make(map[string]string),
  141. }
  142. for _, v := range esAgg.Get("buckets").MustArray() {
  143. bucket := simplejson.NewFromAny(v)
  144. value := castToNullFloat(bucket.Get("doc_count"))
  145. key := castToNullFloat(bucket.Get("key"))
  146. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  147. }
  148. for k, v := range props {
  149. newSeries.Tags[k] = v
  150. }
  151. newSeries.Tags["metric"] = countType
  152. *series = append(*series, &newSeries)
  153. case percentilesType:
  154. buckets := esAgg.Get("buckets").MustArray()
  155. if len(buckets) == 0 {
  156. break
  157. }
  158. firstBucket := simplejson.NewFromAny(buckets[0])
  159. percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
  160. percentileKeys := make([]string, 0)
  161. for k := range percentiles {
  162. percentileKeys = append(percentileKeys, k)
  163. }
  164. sort.Strings(percentileKeys)
  165. for _, percentileName := range percentileKeys {
  166. newSeries := tsdb.TimeSeries{
  167. Tags: make(map[string]string),
  168. }
  169. for k, v := range props {
  170. newSeries.Tags[k] = v
  171. }
  172. newSeries.Tags["metric"] = "p" + percentileName
  173. newSeries.Tags["field"] = metric.Field
  174. for _, v := range buckets {
  175. bucket := simplejson.NewFromAny(v)
  176. value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName))
  177. key := castToNullFloat(bucket.Get("key"))
  178. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  179. }
  180. *series = append(*series, &newSeries)
  181. }
  182. case extendedStatsType:
  183. buckets := esAgg.Get("buckets").MustArray()
  184. metaKeys := make([]string, 0)
  185. meta := metric.Meta.MustMap()
  186. for k := range meta {
  187. metaKeys = append(metaKeys, k)
  188. }
  189. sort.Strings(metaKeys)
  190. for _, statName := range metaKeys {
  191. v := meta[statName]
  192. if enabled, ok := v.(bool); !ok || !enabled {
  193. continue
  194. }
  195. newSeries := tsdb.TimeSeries{
  196. Tags: make(map[string]string),
  197. }
  198. for k, v := range props {
  199. newSeries.Tags[k] = v
  200. }
  201. newSeries.Tags["metric"] = statName
  202. newSeries.Tags["field"] = metric.Field
  203. for _, v := range buckets {
  204. bucket := simplejson.NewFromAny(v)
  205. key := castToNullFloat(bucket.Get("key"))
  206. var value null.Float
  207. if statName == "std_deviation_bounds_upper" {
  208. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  209. } else if statName == "std_deviation_bounds_lower" {
  210. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  211. } else {
  212. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  213. }
  214. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  215. }
  216. *series = append(*series, &newSeries)
  217. }
  218. default:
  219. newSeries := tsdb.TimeSeries{
  220. Tags: make(map[string]string),
  221. }
  222. for k, v := range props {
  223. newSeries.Tags[k] = v
  224. }
  225. newSeries.Tags["metric"] = metric.Type
  226. newSeries.Tags["field"] = metric.Field
  227. newSeries.Tags["metricId"] = metric.ID
  228. for _, v := range esAgg.Get("buckets").MustArray() {
  229. bucket := simplejson.NewFromAny(v)
  230. key := castToNullFloat(bucket.Get("key"))
  231. valueObj, err := bucket.Get(metric.ID).Map()
  232. if err != nil {
  233. continue
  234. }
  235. var value null.Float
  236. if _, ok := valueObj["normalized_value"]; ok {
  237. value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value"))
  238. } else {
  239. value = castToNullFloat(bucket.GetPath(metric.ID, "value"))
  240. }
  241. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  242. }
  243. *series = append(*series, &newSeries)
  244. }
  245. }
  246. return nil
  247. }
  248. func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, table *tsdb.Table, props map[string]string) error {
  249. propKeys := make([]string, 0)
  250. for k := range props {
  251. propKeys = append(propKeys, k)
  252. }
  253. sort.Strings(propKeys)
  254. if len(table.Columns) == 0 {
  255. for _, propKey := range propKeys {
  256. table.Columns = append(table.Columns, tsdb.TableColumn{Text: propKey})
  257. }
  258. table.Columns = append(table.Columns, tsdb.TableColumn{Text: aggDef.Field})
  259. }
  260. addMetricValue := func(values *tsdb.RowValues, metricName string, value null.Float) {
  261. found := false
  262. for _, c := range table.Columns {
  263. if c.Text == metricName {
  264. found = true
  265. break
  266. }
  267. }
  268. if !found {
  269. table.Columns = append(table.Columns, tsdb.TableColumn{Text: metricName})
  270. }
  271. *values = append(*values, value)
  272. }
  273. for _, v := range esAgg.Get("buckets").MustArray() {
  274. bucket := simplejson.NewFromAny(v)
  275. values := make(tsdb.RowValues, 0)
  276. for _, propKey := range propKeys {
  277. values = append(values, props[propKey])
  278. }
  279. if key, err := bucket.Get("key").String(); err == nil {
  280. values = append(values, key)
  281. } else {
  282. values = append(values, castToNullFloat(bucket.Get("key")))
  283. }
  284. for _, metric := range target.Metrics {
  285. switch metric.Type {
  286. case countType:
  287. addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count")))
  288. case extendedStatsType:
  289. metaKeys := make([]string, 0)
  290. meta := metric.Meta.MustMap()
  291. for k := range meta {
  292. metaKeys = append(metaKeys, k)
  293. }
  294. sort.Strings(metaKeys)
  295. for _, statName := range metaKeys {
  296. v := meta[statName]
  297. if enabled, ok := v.(bool); !ok || !enabled {
  298. continue
  299. }
  300. var value null.Float
  301. if statName == "std_deviation_bounds_upper" {
  302. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  303. } else if statName == "std_deviation_bounds_lower" {
  304. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  305. } else {
  306. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  307. }
  308. addMetricValue(&values, rp.getMetricName(metric.Type), value)
  309. break
  310. }
  311. default:
  312. metricName := rp.getMetricName(metric.Type)
  313. otherMetrics := make([]*MetricAgg, 0)
  314. for _, m := range target.Metrics {
  315. if m.Type == metric.Type {
  316. otherMetrics = append(otherMetrics, m)
  317. }
  318. }
  319. if len(otherMetrics) > 1 {
  320. metricName += " " + metric.Field
  321. }
  322. addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value")))
  323. }
  324. }
  325. table.Rows = append(table.Rows, values)
  326. }
  327. return nil
  328. }
  329. func (rp *responseParser) trimDatapoints(series *tsdb.TimeSeriesSlice, target *Query) {
  330. var histogram *BucketAgg
  331. for _, bucketAgg := range target.BucketAggs {
  332. if bucketAgg.Type == dateHistType {
  333. histogram = bucketAgg
  334. break
  335. }
  336. }
  337. if histogram == nil {
  338. return
  339. }
  340. trimEdges, err := histogram.Settings.Get("trimEdges").Int()
  341. if err != nil {
  342. return
  343. }
  344. for _, s := range *series {
  345. if len(s.Points) > trimEdges*2 {
  346. s.Points = s.Points[trimEdges : len(s.Points)-trimEdges]
  347. }
  348. }
  349. }
  350. func (rp *responseParser) nameSeries(seriesList *tsdb.TimeSeriesSlice, target *Query) {
  351. set := make(map[string]string)
  352. for _, v := range *seriesList {
  353. if metricType, exists := v.Tags["metric"]; exists {
  354. if _, ok := set[metricType]; !ok {
  355. set[metricType] = ""
  356. }
  357. }
  358. }
  359. metricTypeCount := len(set)
  360. for _, series := range *seriesList {
  361. series.Name = rp.getSeriesName(series, target, metricTypeCount)
  362. }
  363. }
  364. var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
  365. func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
  366. metricType := series.Tags["metric"]
  367. metricName := rp.getMetricName(metricType)
  368. delete(series.Tags, "metric")
  369. field := ""
  370. if v, ok := series.Tags["field"]; ok {
  371. field = v
  372. delete(series.Tags, "field")
  373. }
  374. if target.Alias != "" {
  375. seriesName := target.Alias
  376. subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
  377. for _, subMatch := range subMatches {
  378. group := subMatch[0]
  379. if len(subMatch) > 1 {
  380. group = subMatch[1]
  381. }
  382. if strings.Index(group, "term ") == 0 {
  383. seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1)
  384. }
  385. if v, ok := series.Tags[group]; ok {
  386. seriesName = strings.Replace(seriesName, subMatch[0], v, 1)
  387. }
  388. if group == "metric" {
  389. seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1)
  390. }
  391. if group == "field" {
  392. seriesName = strings.Replace(seriesName, subMatch[0], field, 1)
  393. }
  394. }
  395. return seriesName
  396. }
  397. // todo, if field and pipelineAgg
  398. if field != "" && isPipelineAgg(metricType) {
  399. if isPipelineAggWithMultipleBucketPaths(metricType) {
  400. metricID := ""
  401. if v, ok := series.Tags["metricId"]; ok {
  402. metricID = v
  403. }
  404. for _, metric := range target.Metrics {
  405. if metric.ID == metricID {
  406. metricName = metric.Settings.Get("script").MustString()
  407. for name, pipelineAgg := range metric.PipelineVariables {
  408. for _, m := range target.Metrics {
  409. if m.ID == pipelineAgg {
  410. metricName = strings.Replace(metricName, "params."+name, describeMetric(m.Type, m.Field), -1)
  411. }
  412. }
  413. }
  414. }
  415. }
  416. } else {
  417. found := false
  418. for _, metric := range target.Metrics {
  419. if metric.ID == field {
  420. metricName += " " + describeMetric(metric.Type, field)
  421. found = true
  422. }
  423. }
  424. if !found {
  425. metricName = "Unset"
  426. }
  427. }
  428. } else if field != "" {
  429. metricName += " " + field
  430. }
  431. delete(series.Tags, "metricId")
  432. if len(series.Tags) == 0 {
  433. return metricName
  434. }
  435. name := ""
  436. for _, v := range series.Tags {
  437. name += v + " "
  438. }
  439. if metricTypeCount == 1 {
  440. return strings.TrimSpace(name)
  441. }
  442. return strings.TrimSpace(name) + " " + metricName
  443. }
  444. func (rp *responseParser) getMetricName(metric string) string {
  445. if text, ok := metricAggType[metric]; ok {
  446. return text
  447. }
  448. if text, ok := extendedStats[metric]; ok {
  449. return text
  450. }
  451. return metric
  452. }
  453. func castToNullFloat(j *simplejson.Json) null.Float {
  454. f, err := j.Float64()
  455. if err == nil {
  456. return null.FloatFrom(f)
  457. }
  458. if s, err := j.String(); err == nil {
  459. if strings.ToLower(s) == "nan" {
  460. return null.NewFloat(0, false)
  461. }
  462. if v, err := strconv.ParseFloat(s, 64); err == nil {
  463. return null.FloatFromPtr(&v)
  464. }
  465. }
  466. return null.NewFloat(0, false)
  467. }
  468. func findAgg(target *Query, aggID string) (*BucketAgg, error) {
  469. for _, v := range target.BucketAggs {
  470. if aggID == v.ID {
  471. return v, nil
  472. }
  473. }
  474. return nil, errors.New("can't found aggDef, aggID:" + aggID)
  475. }
  476. func getErrorFromElasticResponse(response *es.SearchResponse) *tsdb.QueryResult {
  477. result := tsdb.NewQueryResult()
  478. json := simplejson.NewFromAny(response.Error)
  479. reason := json.Get("reason").MustString()
  480. rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
  481. if rootCauseReason != "" {
  482. result.ErrorString = rootCauseReason
  483. } else if reason != "" {
  484. result.ErrorString = reason
  485. } else {
  486. result.ErrorString = "Unknown elasticsearch error response"
  487. }
  488. return result
  489. }