response_parser.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "regexp"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/grafana/grafana/pkg/components/null"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  12. )
  13. type responseParser struct {
  14. Responses []*es.SearchResponse
  15. Targets []*Query
  16. }
  17. var newResponseParser = func(responses []*es.SearchResponse, targets []*Query) *responseParser {
  18. return &responseParser{
  19. Responses: responses,
  20. Targets: targets,
  21. }
  22. }
  23. func (rp *responseParser) getTimeSeries() (*tsdb.Response, error) {
  24. result := &tsdb.Response{}
  25. result.Results = make(map[string]*tsdb.QueryResult)
  26. if rp.Responses == nil {
  27. return result, nil
  28. }
  29. for i, res := range rp.Responses {
  30. target := rp.Targets[i]
  31. if res.Error != nil {
  32. result.Results[target.RefID] = getErrorFromElasticResponse(res)
  33. continue
  34. }
  35. queryRes := tsdb.NewQueryResult()
  36. props := make(map[string]string)
  37. table := tsdb.Table{
  38. Columns: make([]tsdb.TableColumn, 0),
  39. Rows: make([]tsdb.RowValues, 0),
  40. }
  41. err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
  42. if err != nil {
  43. return nil, err
  44. }
  45. rp.nameSeries(&queryRes.Series, target)
  46. rp.trimDatapoints(&queryRes.Series, target)
  47. if len(table.Rows) > 0 {
  48. queryRes.Tables = append(queryRes.Tables, &table)
  49. }
  50. result.Results[target.RefID] = queryRes
  51. }
  52. return result, nil
  53. }
  54. func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, series *tsdb.TimeSeriesSlice, table *tsdb.Table, props map[string]string, depth int) error {
  55. var err error
  56. maxDepth := len(target.BucketAggs) - 1
  57. aggIDs := make([]string, 0)
  58. for k := range aggs {
  59. aggIDs = append(aggIDs, k)
  60. }
  61. sort.Strings(aggIDs)
  62. for _, aggID := range aggIDs {
  63. v := aggs[aggID]
  64. aggDef, _ := findAgg(target, aggID)
  65. esAgg := simplejson.NewFromAny(v)
  66. if aggDef == nil {
  67. continue
  68. }
  69. if depth == maxDepth {
  70. if aggDef.Type == "date_histogram" {
  71. err = rp.processMetrics(esAgg, target, series, props)
  72. } else {
  73. err = rp.processAggregationDocs(esAgg, aggDef, target, table, props)
  74. }
  75. if err != nil {
  76. return err
  77. }
  78. } else {
  79. for _, b := range esAgg.Get("buckets").MustArray() {
  80. bucket := simplejson.NewFromAny(b)
  81. newProps := make(map[string]string, 0)
  82. for k, v := range props {
  83. newProps[k] = v
  84. }
  85. if key, err := bucket.Get("key").String(); err == nil {
  86. newProps[aggDef.Field] = key
  87. } else if key, err := bucket.Get("key").Int64(); err == nil {
  88. newProps[aggDef.Field] = strconv.FormatInt(key, 10)
  89. }
  90. if key, err := bucket.Get("key_as_string").String(); err == nil {
  91. newProps[aggDef.Field] = key
  92. }
  93. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  94. if err != nil {
  95. return err
  96. }
  97. }
  98. buckets := esAgg.Get("buckets").MustMap()
  99. bucketKeys := make([]string, 0)
  100. for k := range buckets {
  101. bucketKeys = append(bucketKeys, k)
  102. }
  103. sort.Strings(bucketKeys)
  104. for _, bucketKey := range bucketKeys {
  105. bucket := simplejson.NewFromAny(buckets[bucketKey])
  106. newProps := make(map[string]string, 0)
  107. for k, v := range props {
  108. newProps[k] = v
  109. }
  110. newProps["filter"] = bucketKey
  111. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  112. if err != nil {
  113. return err
  114. }
  115. }
  116. }
  117. }
  118. return nil
  119. }
  120. func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *tsdb.TimeSeriesSlice, props map[string]string) error {
  121. for _, metric := range target.Metrics {
  122. if metric.Hide {
  123. continue
  124. }
  125. switch metric.Type {
  126. case "count":
  127. newSeries := tsdb.TimeSeries{
  128. Tags: make(map[string]string),
  129. }
  130. for _, v := range esAgg.Get("buckets").MustArray() {
  131. bucket := simplejson.NewFromAny(v)
  132. value := castToNullFloat(bucket.Get("doc_count"))
  133. key := castToNullFloat(bucket.Get("key"))
  134. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  135. }
  136. for k, v := range props {
  137. newSeries.Tags[k] = v
  138. }
  139. newSeries.Tags["metric"] = "count"
  140. *series = append(*series, &newSeries)
  141. case "percentiles":
  142. buckets := esAgg.Get("buckets").MustArray()
  143. if len(buckets) == 0 {
  144. break
  145. }
  146. firstBucket := simplejson.NewFromAny(buckets[0])
  147. percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
  148. percentileKeys := make([]string, 0)
  149. for k := range percentiles {
  150. percentileKeys = append(percentileKeys, k)
  151. }
  152. sort.Strings(percentileKeys)
  153. for _, percentileName := range percentileKeys {
  154. newSeries := tsdb.TimeSeries{
  155. Tags: make(map[string]string),
  156. }
  157. for k, v := range props {
  158. newSeries.Tags[k] = v
  159. }
  160. newSeries.Tags["metric"] = "p" + percentileName
  161. newSeries.Tags["field"] = metric.Field
  162. for _, v := range buckets {
  163. bucket := simplejson.NewFromAny(v)
  164. value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName))
  165. key := castToNullFloat(bucket.Get("key"))
  166. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  167. }
  168. *series = append(*series, &newSeries)
  169. }
  170. case "extended_stats":
  171. buckets := esAgg.Get("buckets").MustArray()
  172. metaKeys := make([]string, 0)
  173. meta := metric.Meta.MustMap()
  174. for k := range meta {
  175. metaKeys = append(metaKeys, k)
  176. }
  177. sort.Strings(metaKeys)
  178. for _, statName := range metaKeys {
  179. v := meta[statName]
  180. if enabled, ok := v.(bool); !ok || !enabled {
  181. continue
  182. }
  183. newSeries := tsdb.TimeSeries{
  184. Tags: make(map[string]string),
  185. }
  186. for k, v := range props {
  187. newSeries.Tags[k] = v
  188. }
  189. newSeries.Tags["metric"] = statName
  190. newSeries.Tags["field"] = metric.Field
  191. for _, v := range buckets {
  192. bucket := simplejson.NewFromAny(v)
  193. key := castToNullFloat(bucket.Get("key"))
  194. var value null.Float
  195. if statName == "std_deviation_bounds_upper" {
  196. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  197. } else if statName == "std_deviation_bounds_lower" {
  198. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  199. } else {
  200. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  201. }
  202. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  203. }
  204. *series = append(*series, &newSeries)
  205. }
  206. default:
  207. newSeries := tsdb.TimeSeries{
  208. Tags: make(map[string]string),
  209. }
  210. for k, v := range props {
  211. newSeries.Tags[k] = v
  212. }
  213. newSeries.Tags["metric"] = metric.Type
  214. newSeries.Tags["field"] = metric.Field
  215. for _, v := range esAgg.Get("buckets").MustArray() {
  216. bucket := simplejson.NewFromAny(v)
  217. key := castToNullFloat(bucket.Get("key"))
  218. valueObj, err := bucket.Get(metric.ID).Map()
  219. if err != nil {
  220. continue
  221. }
  222. var value null.Float
  223. if _, ok := valueObj["normalized_value"]; ok {
  224. value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value"))
  225. } else {
  226. value = castToNullFloat(bucket.GetPath(metric.ID, "value"))
  227. }
  228. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  229. }
  230. *series = append(*series, &newSeries)
  231. }
  232. }
  233. return nil
  234. }
  235. func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, table *tsdb.Table, props map[string]string) error {
  236. propKeys := make([]string, 0)
  237. for k := range props {
  238. propKeys = append(propKeys, k)
  239. }
  240. sort.Strings(propKeys)
  241. if len(table.Columns) == 0 {
  242. for _, propKey := range propKeys {
  243. table.Columns = append(table.Columns, tsdb.TableColumn{Text: propKey})
  244. }
  245. table.Columns = append(table.Columns, tsdb.TableColumn{Text: aggDef.Field})
  246. }
  247. addMetricValue := func(values *tsdb.RowValues, metricName string, value null.Float) {
  248. found := false
  249. for _, c := range table.Columns {
  250. if c.Text == metricName {
  251. found = true
  252. break
  253. }
  254. }
  255. if !found {
  256. table.Columns = append(table.Columns, tsdb.TableColumn{Text: metricName})
  257. }
  258. *values = append(*values, value)
  259. }
  260. for _, v := range esAgg.Get("buckets").MustArray() {
  261. bucket := simplejson.NewFromAny(v)
  262. values := make(tsdb.RowValues, 0)
  263. for _, propKey := range propKeys {
  264. values = append(values, props[propKey])
  265. }
  266. if key, err := bucket.Get("key").String(); err == nil {
  267. values = append(values, key)
  268. } else {
  269. values = append(values, castToNullFloat(bucket.Get("key")))
  270. }
  271. for _, metric := range target.Metrics {
  272. switch metric.Type {
  273. case "count":
  274. addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count")))
  275. break
  276. case "extended_stats":
  277. metaKeys := make([]string, 0)
  278. meta := metric.Meta.MustMap()
  279. for k := range meta {
  280. metaKeys = append(metaKeys, k)
  281. }
  282. sort.Strings(metaKeys)
  283. for _, statName := range metaKeys {
  284. v := meta[statName]
  285. if enabled, ok := v.(bool); !ok || !enabled {
  286. continue
  287. }
  288. var value null.Float
  289. if statName == "std_deviation_bounds_upper" {
  290. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  291. } else if statName == "std_deviation_bounds_lower" {
  292. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  293. } else {
  294. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  295. }
  296. addMetricValue(&values, rp.getMetricName(metric.Type), value)
  297. break
  298. }
  299. default:
  300. metricName := rp.getMetricName(metric.Type)
  301. otherMetrics := make([]*MetricAgg, 0)
  302. for _, m := range target.Metrics {
  303. if m.Type == metric.Type {
  304. otherMetrics = append(otherMetrics, m)
  305. }
  306. }
  307. if len(otherMetrics) > 1 {
  308. metricName += " " + metric.Field
  309. }
  310. addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value")))
  311. break
  312. }
  313. }
  314. table.Rows = append(table.Rows, values)
  315. }
  316. return nil
  317. }
  318. func (rp *responseParser) trimDatapoints(series *tsdb.TimeSeriesSlice, target *Query) {
  319. var histogram *BucketAgg
  320. for _, bucketAgg := range target.BucketAggs {
  321. if bucketAgg.Type == "date_histogram" {
  322. histogram = bucketAgg
  323. break
  324. }
  325. }
  326. if histogram == nil {
  327. return
  328. }
  329. trimEdges, err := histogram.Settings.Get("trimEdges").Int()
  330. if err != nil {
  331. return
  332. }
  333. for _, s := range *series {
  334. if len(s.Points) > trimEdges*2 {
  335. s.Points = s.Points[trimEdges : len(s.Points)-trimEdges]
  336. }
  337. }
  338. }
  339. func (rp *responseParser) nameSeries(seriesList *tsdb.TimeSeriesSlice, target *Query) {
  340. set := make(map[string]string)
  341. for _, v := range *seriesList {
  342. if metricType, exists := v.Tags["metric"]; exists {
  343. if _, ok := set[metricType]; !ok {
  344. set[metricType] = ""
  345. }
  346. }
  347. }
  348. metricTypeCount := len(set)
  349. for _, series := range *seriesList {
  350. series.Name = rp.getSeriesName(series, target, metricTypeCount)
  351. }
  352. }
  353. var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
  354. func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
  355. metricType := series.Tags["metric"]
  356. metricName := rp.getMetricName(metricType)
  357. delete(series.Tags, "metric")
  358. field := ""
  359. if v, ok := series.Tags["field"]; ok {
  360. field = v
  361. delete(series.Tags, "field")
  362. }
  363. if target.Alias != "" {
  364. seriesName := target.Alias
  365. subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
  366. for _, subMatch := range subMatches {
  367. group := subMatch[0]
  368. if len(subMatch) > 1 {
  369. group = subMatch[1]
  370. }
  371. if strings.Index(group, "term ") == 0 {
  372. seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1)
  373. }
  374. if v, ok := series.Tags[group]; ok {
  375. seriesName = strings.Replace(seriesName, subMatch[0], v, 1)
  376. }
  377. if group == "metric" {
  378. seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1)
  379. }
  380. if group == "field" {
  381. seriesName = strings.Replace(seriesName, subMatch[0], field, 1)
  382. }
  383. }
  384. return seriesName
  385. }
  386. // todo, if field and pipelineAgg
  387. if field != "" && isPipelineAgg(metricType) {
  388. found := false
  389. for _, metric := range target.Metrics {
  390. if metric.ID == field {
  391. metricName += " " + describeMetric(metric.Type, field)
  392. found = true
  393. }
  394. }
  395. if !found {
  396. metricName = "Unset"
  397. }
  398. } else if field != "" {
  399. metricName += " " + field
  400. }
  401. if len(series.Tags) == 0 {
  402. return metricName
  403. }
  404. name := ""
  405. for _, v := range series.Tags {
  406. name += v + " "
  407. }
  408. if metricTypeCount == 1 {
  409. return strings.TrimSpace(name)
  410. }
  411. return strings.TrimSpace(name) + " " + metricName
  412. }
  413. func (rp *responseParser) getMetricName(metric string) string {
  414. if text, ok := metricAggType[metric]; ok {
  415. return text
  416. }
  417. if text, ok := extendedStats[metric]; ok {
  418. return text
  419. }
  420. return metric
  421. }
  422. func castToNullFloat(j *simplejson.Json) null.Float {
  423. f, err := j.Float64()
  424. if err == nil {
  425. return null.FloatFrom(f)
  426. }
  427. if s, err := j.String(); err == nil {
  428. if strings.ToLower(s) == "nan" {
  429. return null.NewFloat(0, false)
  430. }
  431. if v, err := strconv.ParseFloat(s, 64); err == nil {
  432. return null.FloatFromPtr(&v)
  433. }
  434. }
  435. return null.NewFloat(0, false)
  436. }
  437. func findAgg(target *Query, aggID string) (*BucketAgg, error) {
  438. for _, v := range target.BucketAggs {
  439. if aggID == v.ID {
  440. return v, nil
  441. }
  442. }
  443. return nil, errors.New("can't found aggDef, aggID:" + aggID)
  444. }
  445. func getErrorFromElasticResponse(response *es.SearchResponse) *tsdb.QueryResult {
  446. result := tsdb.NewQueryResult()
  447. json := simplejson.NewFromAny(response.Error)
  448. reason := json.Get("reason").MustString()
  449. rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
  450. if rootCauseReason != "" {
  451. result.ErrorString = rootCauseReason
  452. } else if reason != "" {
  453. result.ErrorString = reason
  454. } else {
  455. result.ErrorString = "Unkown elasticsearch error response"
  456. }
  457. return result
  458. }