response_parser.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "regexp"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/grafana/grafana/pkg/components/null"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  12. )
  13. const (
  14. // Metric types
  15. countType = "count"
  16. percentilesType = "percentiles"
  17. extendedStatsType = "extended_stats"
  18. // Bucket types
  19. dateHistType = "date_histogram"
  20. histogramType = "histogram"
  21. filtersType = "filters"
  22. termsType = "terms"
  23. geohashGridType = "geohash_grid"
  24. )
  25. type responseParser struct {
  26. Responses []*es.SearchResponse
  27. Targets []*Query
  28. DebugInfo *es.SearchDebugInfo
  29. }
  30. var newResponseParser = func(responses []*es.SearchResponse, targets []*Query, debugInfo *es.SearchDebugInfo) *responseParser {
  31. return &responseParser{
  32. Responses: responses,
  33. Targets: targets,
  34. DebugInfo: debugInfo,
  35. }
  36. }
  37. func (rp *responseParser) getTimeSeries() (*tsdb.Response, error) {
  38. result := &tsdb.Response{}
  39. result.Results = make(map[string]*tsdb.QueryResult)
  40. if rp.Responses == nil {
  41. return result, nil
  42. }
  43. for i, res := range rp.Responses {
  44. target := rp.Targets[i]
  45. var debugInfo *simplejson.Json
  46. if rp.DebugInfo != nil && i == 0 {
  47. debugInfo = simplejson.NewFromAny(rp.DebugInfo)
  48. }
  49. if res.Error != nil {
  50. result.Results[target.RefID] = getErrorFromElasticResponse(res)
  51. result.Results[target.RefID].Meta = debugInfo
  52. continue
  53. }
  54. queryRes := tsdb.NewQueryResult()
  55. queryRes.Meta = debugInfo
  56. props := make(map[string]string)
  57. table := tsdb.Table{
  58. Columns: make([]tsdb.TableColumn, 0),
  59. Rows: make([]tsdb.RowValues, 0),
  60. }
  61. err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
  62. if err != nil {
  63. return nil, err
  64. }
  65. rp.nameSeries(&queryRes.Series, target)
  66. rp.trimDatapoints(&queryRes.Series, target)
  67. if len(table.Rows) > 0 {
  68. queryRes.Tables = append(queryRes.Tables, &table)
  69. }
  70. result.Results[target.RefID] = queryRes
  71. }
  72. return result, nil
  73. }
  74. func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, series *tsdb.TimeSeriesSlice, table *tsdb.Table, props map[string]string, depth int) error {
  75. var err error
  76. maxDepth := len(target.BucketAggs) - 1
  77. aggIDs := make([]string, 0)
  78. for k := range aggs {
  79. aggIDs = append(aggIDs, k)
  80. }
  81. sort.Strings(aggIDs)
  82. for _, aggID := range aggIDs {
  83. v := aggs[aggID]
  84. aggDef, _ := findAgg(target, aggID)
  85. esAgg := simplejson.NewFromAny(v)
  86. if aggDef == nil {
  87. continue
  88. }
  89. if depth == maxDepth {
  90. if aggDef.Type == dateHistType {
  91. err = rp.processMetrics(esAgg, target, series, props)
  92. } else {
  93. err = rp.processAggregationDocs(esAgg, aggDef, target, table, props)
  94. }
  95. if err != nil {
  96. return err
  97. }
  98. } else {
  99. for _, b := range esAgg.Get("buckets").MustArray() {
  100. bucket := simplejson.NewFromAny(b)
  101. newProps := make(map[string]string)
  102. for k, v := range props {
  103. newProps[k] = v
  104. }
  105. if key, err := bucket.Get("key").String(); err == nil {
  106. newProps[aggDef.Field] = key
  107. } else if key, err := bucket.Get("key").Int64(); err == nil {
  108. newProps[aggDef.Field] = strconv.FormatInt(key, 10)
  109. }
  110. if key, err := bucket.Get("key_as_string").String(); err == nil {
  111. newProps[aggDef.Field] = key
  112. }
  113. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  114. if err != nil {
  115. return err
  116. }
  117. }
  118. buckets := esAgg.Get("buckets").MustMap()
  119. bucketKeys := make([]string, 0)
  120. for k := range buckets {
  121. bucketKeys = append(bucketKeys, k)
  122. }
  123. sort.Strings(bucketKeys)
  124. for _, bucketKey := range bucketKeys {
  125. bucket := simplejson.NewFromAny(buckets[bucketKey])
  126. newProps := make(map[string]string)
  127. for k, v := range props {
  128. newProps[k] = v
  129. }
  130. newProps["filter"] = bucketKey
  131. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  132. if err != nil {
  133. return err
  134. }
  135. }
  136. }
  137. }
  138. return nil
  139. }
  140. func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *tsdb.TimeSeriesSlice, props map[string]string) error {
  141. for _, metric := range target.Metrics {
  142. if metric.Hide {
  143. continue
  144. }
  145. switch metric.Type {
  146. case countType:
  147. newSeries := tsdb.TimeSeries{
  148. Tags: make(map[string]string),
  149. }
  150. for _, v := range esAgg.Get("buckets").MustArray() {
  151. bucket := simplejson.NewFromAny(v)
  152. value := castToNullFloat(bucket.Get("doc_count"))
  153. key := castToNullFloat(bucket.Get("key"))
  154. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  155. }
  156. for k, v := range props {
  157. newSeries.Tags[k] = v
  158. }
  159. newSeries.Tags["metric"] = countType
  160. *series = append(*series, &newSeries)
  161. case percentilesType:
  162. buckets := esAgg.Get("buckets").MustArray()
  163. if len(buckets) == 0 {
  164. break
  165. }
  166. firstBucket := simplejson.NewFromAny(buckets[0])
  167. percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
  168. percentileKeys := make([]string, 0)
  169. for k := range percentiles {
  170. percentileKeys = append(percentileKeys, k)
  171. }
  172. sort.Strings(percentileKeys)
  173. for _, percentileName := range percentileKeys {
  174. newSeries := tsdb.TimeSeries{
  175. Tags: make(map[string]string),
  176. }
  177. for k, v := range props {
  178. newSeries.Tags[k] = v
  179. }
  180. newSeries.Tags["metric"] = "p" + percentileName
  181. newSeries.Tags["field"] = metric.Field
  182. for _, v := range buckets {
  183. bucket := simplejson.NewFromAny(v)
  184. value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName))
  185. key := castToNullFloat(bucket.Get("key"))
  186. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  187. }
  188. *series = append(*series, &newSeries)
  189. }
  190. case extendedStatsType:
  191. buckets := esAgg.Get("buckets").MustArray()
  192. metaKeys := make([]string, 0)
  193. meta := metric.Meta.MustMap()
  194. for k := range meta {
  195. metaKeys = append(metaKeys, k)
  196. }
  197. sort.Strings(metaKeys)
  198. for _, statName := range metaKeys {
  199. v := meta[statName]
  200. if enabled, ok := v.(bool); !ok || !enabled {
  201. continue
  202. }
  203. newSeries := tsdb.TimeSeries{
  204. Tags: make(map[string]string),
  205. }
  206. for k, v := range props {
  207. newSeries.Tags[k] = v
  208. }
  209. newSeries.Tags["metric"] = statName
  210. newSeries.Tags["field"] = metric.Field
  211. for _, v := range buckets {
  212. bucket := simplejson.NewFromAny(v)
  213. key := castToNullFloat(bucket.Get("key"))
  214. var value null.Float
  215. if statName == "std_deviation_bounds_upper" {
  216. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  217. } else if statName == "std_deviation_bounds_lower" {
  218. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  219. } else {
  220. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  221. }
  222. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  223. }
  224. *series = append(*series, &newSeries)
  225. }
  226. default:
  227. newSeries := tsdb.TimeSeries{
  228. Tags: make(map[string]string),
  229. }
  230. for k, v := range props {
  231. newSeries.Tags[k] = v
  232. }
  233. newSeries.Tags["metric"] = metric.Type
  234. newSeries.Tags["field"] = metric.Field
  235. newSeries.Tags["metricId"] = metric.ID
  236. for _, v := range esAgg.Get("buckets").MustArray() {
  237. bucket := simplejson.NewFromAny(v)
  238. key := castToNullFloat(bucket.Get("key"))
  239. valueObj, err := bucket.Get(metric.ID).Map()
  240. if err != nil {
  241. continue
  242. }
  243. var value null.Float
  244. if _, ok := valueObj["normalized_value"]; ok {
  245. value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value"))
  246. } else {
  247. value = castToNullFloat(bucket.GetPath(metric.ID, "value"))
  248. }
  249. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  250. }
  251. *series = append(*series, &newSeries)
  252. }
  253. }
  254. return nil
  255. }
  256. func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, table *tsdb.Table, props map[string]string) error {
  257. propKeys := make([]string, 0)
  258. for k := range props {
  259. propKeys = append(propKeys, k)
  260. }
  261. sort.Strings(propKeys)
  262. if len(table.Columns) == 0 {
  263. for _, propKey := range propKeys {
  264. table.Columns = append(table.Columns, tsdb.TableColumn{Text: propKey})
  265. }
  266. table.Columns = append(table.Columns, tsdb.TableColumn{Text: aggDef.Field})
  267. }
  268. addMetricValue := func(values *tsdb.RowValues, metricName string, value null.Float) {
  269. found := false
  270. for _, c := range table.Columns {
  271. if c.Text == metricName {
  272. found = true
  273. break
  274. }
  275. }
  276. if !found {
  277. table.Columns = append(table.Columns, tsdb.TableColumn{Text: metricName})
  278. }
  279. *values = append(*values, value)
  280. }
  281. for _, v := range esAgg.Get("buckets").MustArray() {
  282. bucket := simplejson.NewFromAny(v)
  283. values := make(tsdb.RowValues, 0)
  284. for _, propKey := range propKeys {
  285. values = append(values, props[propKey])
  286. }
  287. if key, err := bucket.Get("key").String(); err == nil {
  288. values = append(values, key)
  289. } else {
  290. values = append(values, castToNullFloat(bucket.Get("key")))
  291. }
  292. for _, metric := range target.Metrics {
  293. switch metric.Type {
  294. case countType:
  295. addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count")))
  296. case extendedStatsType:
  297. metaKeys := make([]string, 0)
  298. meta := metric.Meta.MustMap()
  299. for k := range meta {
  300. metaKeys = append(metaKeys, k)
  301. }
  302. sort.Strings(metaKeys)
  303. for _, statName := range metaKeys {
  304. v := meta[statName]
  305. if enabled, ok := v.(bool); !ok || !enabled {
  306. continue
  307. }
  308. var value null.Float
  309. if statName == "std_deviation_bounds_upper" {
  310. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  311. } else if statName == "std_deviation_bounds_lower" {
  312. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  313. } else {
  314. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  315. }
  316. addMetricValue(&values, rp.getMetricName(metric.Type), value)
  317. break
  318. }
  319. default:
  320. metricName := rp.getMetricName(metric.Type)
  321. otherMetrics := make([]*MetricAgg, 0)
  322. for _, m := range target.Metrics {
  323. if m.Type == metric.Type {
  324. otherMetrics = append(otherMetrics, m)
  325. }
  326. }
  327. if len(otherMetrics) > 1 {
  328. metricName += " " + metric.Field
  329. }
  330. addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value")))
  331. }
  332. }
  333. table.Rows = append(table.Rows, values)
  334. }
  335. return nil
  336. }
  337. func (rp *responseParser) trimDatapoints(series *tsdb.TimeSeriesSlice, target *Query) {
  338. var histogram *BucketAgg
  339. for _, bucketAgg := range target.BucketAggs {
  340. if bucketAgg.Type == dateHistType {
  341. histogram = bucketAgg
  342. break
  343. }
  344. }
  345. if histogram == nil {
  346. return
  347. }
  348. trimEdges, err := histogram.Settings.Get("trimEdges").Int()
  349. if err != nil {
  350. return
  351. }
  352. for _, s := range *series {
  353. if len(s.Points) > trimEdges*2 {
  354. s.Points = s.Points[trimEdges : len(s.Points)-trimEdges]
  355. }
  356. }
  357. }
  358. func (rp *responseParser) nameSeries(seriesList *tsdb.TimeSeriesSlice, target *Query) {
  359. set := make(map[string]string)
  360. for _, v := range *seriesList {
  361. if metricType, exists := v.Tags["metric"]; exists {
  362. if _, ok := set[metricType]; !ok {
  363. set[metricType] = ""
  364. }
  365. }
  366. }
  367. metricTypeCount := len(set)
  368. for _, series := range *seriesList {
  369. series.Name = rp.getSeriesName(series, target, metricTypeCount)
  370. }
  371. }
  372. var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
  373. func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
  374. metricType := series.Tags["metric"]
  375. metricName := rp.getMetricName(metricType)
  376. delete(series.Tags, "metric")
  377. field := ""
  378. if v, ok := series.Tags["field"]; ok {
  379. field = v
  380. delete(series.Tags, "field")
  381. }
  382. if target.Alias != "" {
  383. seriesName := target.Alias
  384. subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
  385. for _, subMatch := range subMatches {
  386. group := subMatch[0]
  387. if len(subMatch) > 1 {
  388. group = subMatch[1]
  389. }
  390. if strings.Index(group, "term ") == 0 {
  391. seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1)
  392. }
  393. if v, ok := series.Tags[group]; ok {
  394. seriesName = strings.Replace(seriesName, subMatch[0], v, 1)
  395. }
  396. if group == "metric" {
  397. seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1)
  398. }
  399. if group == "field" {
  400. seriesName = strings.Replace(seriesName, subMatch[0], field, 1)
  401. }
  402. }
  403. return seriesName
  404. }
  405. // todo, if field and pipelineAgg
  406. if field != "" && isPipelineAgg(metricType) {
  407. if isPipelineAggWithMultipleBucketPaths(metricType) {
  408. metricID := ""
  409. if v, ok := series.Tags["metricId"]; ok {
  410. metricID = v
  411. }
  412. for _, metric := range target.Metrics {
  413. if metric.ID == metricID {
  414. metricName = metric.Settings.Get("script").MustString()
  415. for name, pipelineAgg := range metric.PipelineVariables {
  416. for _, m := range target.Metrics {
  417. if m.ID == pipelineAgg {
  418. metricName = strings.Replace(metricName, "params."+name, describeMetric(m.Type, m.Field), -1)
  419. }
  420. }
  421. }
  422. }
  423. }
  424. } else {
  425. found := false
  426. for _, metric := range target.Metrics {
  427. if metric.ID == field {
  428. metricName += " " + describeMetric(metric.Type, field)
  429. found = true
  430. }
  431. }
  432. if !found {
  433. metricName = "Unset"
  434. }
  435. }
  436. } else if field != "" {
  437. metricName += " " + field
  438. }
  439. delete(series.Tags, "metricId")
  440. if len(series.Tags) == 0 {
  441. return metricName
  442. }
  443. name := ""
  444. for _, v := range series.Tags {
  445. name += v + " "
  446. }
  447. if metricTypeCount == 1 {
  448. return strings.TrimSpace(name)
  449. }
  450. return strings.TrimSpace(name) + " " + metricName
  451. }
  452. func (rp *responseParser) getMetricName(metric string) string {
  453. if text, ok := metricAggType[metric]; ok {
  454. return text
  455. }
  456. if text, ok := extendedStats[metric]; ok {
  457. return text
  458. }
  459. return metric
  460. }
  461. func castToNullFloat(j *simplejson.Json) null.Float {
  462. f, err := j.Float64()
  463. if err == nil {
  464. return null.FloatFrom(f)
  465. }
  466. if s, err := j.String(); err == nil {
  467. if strings.ToLower(s) == "nan" {
  468. return null.NewFloat(0, false)
  469. }
  470. if v, err := strconv.ParseFloat(s, 64); err == nil {
  471. return null.FloatFromPtr(&v)
  472. }
  473. }
  474. return null.NewFloat(0, false)
  475. }
  476. func findAgg(target *Query, aggID string) (*BucketAgg, error) {
  477. for _, v := range target.BucketAggs {
  478. if aggID == v.ID {
  479. return v, nil
  480. }
  481. }
  482. return nil, errors.New("can't found aggDef, aggID:" + aggID)
  483. }
  484. func getErrorFromElasticResponse(response *es.SearchResponse) *tsdb.QueryResult {
  485. result := tsdb.NewQueryResult()
  486. json := simplejson.NewFromAny(response.Error)
  487. reason := json.Get("reason").MustString()
  488. rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
  489. if rootCauseReason != "" {
  490. result.ErrorString = rootCauseReason
  491. } else if reason != "" {
  492. result.ErrorString = reason
  493. } else {
  494. result.ErrorString = "Unknown elasticsearch error response"
  495. }
  496. return result
  497. }