response_parser.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "regexp"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/grafana/grafana/pkg/components/null"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  12. )
  13. type responseParser struct {
  14. Responses []*es.SearchResponse
  15. Targets []*Query
  16. }
  17. var newResponseParser = func(responses []*es.SearchResponse, targets []*Query) *responseParser {
  18. return &responseParser{
  19. Responses: responses,
  20. Targets: targets,
  21. }
  22. }
  23. func (rp *responseParser) getTimeSeries() (*tsdb.Response, error) {
  24. result := &tsdb.Response{}
  25. result.Results = make(map[string]*tsdb.QueryResult)
  26. if rp.Responses == nil {
  27. return result, nil
  28. }
  29. for i, res := range rp.Responses {
  30. target := rp.Targets[i]
  31. if res.Error != nil {
  32. result.Results[target.RefID] = getErrorFromElasticResponse(res)
  33. continue
  34. }
  35. queryRes := tsdb.NewQueryResult()
  36. props := make(map[string]string)
  37. table := tsdb.Table{
  38. Columns: make([]tsdb.TableColumn, 0),
  39. Rows: make([]tsdb.RowValues, 0),
  40. }
  41. err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
  42. if err != nil {
  43. return nil, err
  44. }
  45. rp.nameSeries(&queryRes.Series, target)
  46. rp.trimDatapoints(&queryRes.Series, target)
  47. if len(table.Rows) > 0 {
  48. queryRes.Tables = append(queryRes.Tables, &table)
  49. }
  50. result.Results[target.RefID] = queryRes
  51. }
  52. return result, nil
  53. }
  54. func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, series *tsdb.TimeSeriesSlice, table *tsdb.Table, props map[string]string, depth int) error {
  55. var err error
  56. maxDepth := len(target.BucketAggs) - 1
  57. aggIDs := make([]string, 0)
  58. for k := range aggs {
  59. aggIDs = append(aggIDs, k)
  60. }
  61. sort.Strings(aggIDs)
  62. for _, aggID := range aggIDs {
  63. v := aggs[aggID]
  64. aggDef, _ := findAgg(target, aggID)
  65. esAgg := simplejson.NewFromAny(v)
  66. if aggDef == nil {
  67. continue
  68. }
  69. if depth == maxDepth {
  70. if aggDef.Type == "date_histogram" {
  71. err = rp.processMetrics(esAgg, target, series, props)
  72. } else {
  73. err = rp.processAggregationDocs(esAgg, aggDef, target, table, props)
  74. }
  75. if err != nil {
  76. return err
  77. }
  78. } else {
  79. for _, b := range esAgg.Get("buckets").MustArray() {
  80. bucket := simplejson.NewFromAny(b)
  81. newProps := make(map[string]string)
  82. for k, v := range props {
  83. newProps[k] = v
  84. }
  85. if key, err := bucket.Get("key").String(); err == nil {
  86. newProps[aggDef.Field] = key
  87. } else if key, err := bucket.Get("key").Int64(); err == nil {
  88. newProps[aggDef.Field] = strconv.FormatInt(key, 10)
  89. }
  90. if key, err := bucket.Get("key_as_string").String(); err == nil {
  91. newProps[aggDef.Field] = key
  92. }
  93. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  94. if err != nil {
  95. return err
  96. }
  97. }
  98. buckets := esAgg.Get("buckets").MustMap()
  99. bucketKeys := make([]string, 0)
  100. for k := range buckets {
  101. bucketKeys = append(bucketKeys, k)
  102. }
  103. sort.Strings(bucketKeys)
  104. for _, bucketKey := range bucketKeys {
  105. bucket := simplejson.NewFromAny(buckets[bucketKey])
  106. newProps := make(map[string]string)
  107. for k, v := range props {
  108. newProps[k] = v
  109. }
  110. newProps["filter"] = bucketKey
  111. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  112. if err != nil {
  113. return err
  114. }
  115. }
  116. }
  117. }
  118. return nil
  119. }
  120. func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *tsdb.TimeSeriesSlice, props map[string]string) error {
  121. for _, metric := range target.Metrics {
  122. if metric.Hide {
  123. continue
  124. }
  125. switch metric.Type {
  126. case "count":
  127. newSeries := tsdb.TimeSeries{
  128. Tags: make(map[string]string),
  129. }
  130. for _, v := range esAgg.Get("buckets").MustArray() {
  131. bucket := simplejson.NewFromAny(v)
  132. value := castToNullFloat(bucket.Get("doc_count"))
  133. key := castToNullFloat(bucket.Get("key"))
  134. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  135. }
  136. for k, v := range props {
  137. newSeries.Tags[k] = v
  138. }
  139. newSeries.Tags["metric"] = "count"
  140. *series = append(*series, &newSeries)
  141. case "percentiles":
  142. buckets := esAgg.Get("buckets").MustArray()
  143. if len(buckets) == 0 {
  144. break
  145. }
  146. firstBucket := simplejson.NewFromAny(buckets[0])
  147. percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
  148. percentileKeys := make([]string, 0)
  149. for k := range percentiles {
  150. percentileKeys = append(percentileKeys, k)
  151. }
  152. sort.Strings(percentileKeys)
  153. for _, percentileName := range percentileKeys {
  154. newSeries := tsdb.TimeSeries{
  155. Tags: make(map[string]string),
  156. }
  157. for k, v := range props {
  158. newSeries.Tags[k] = v
  159. }
  160. newSeries.Tags["metric"] = "p" + percentileName
  161. newSeries.Tags["field"] = metric.Field
  162. for _, v := range buckets {
  163. bucket := simplejson.NewFromAny(v)
  164. value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName))
  165. key := castToNullFloat(bucket.Get("key"))
  166. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  167. }
  168. *series = append(*series, &newSeries)
  169. }
  170. case "extended_stats":
  171. buckets := esAgg.Get("buckets").MustArray()
  172. metaKeys := make([]string, 0)
  173. meta := metric.Meta.MustMap()
  174. for k := range meta {
  175. metaKeys = append(metaKeys, k)
  176. }
  177. sort.Strings(metaKeys)
  178. for _, statName := range metaKeys {
  179. v := meta[statName]
  180. if enabled, ok := v.(bool); !ok || !enabled {
  181. continue
  182. }
  183. newSeries := tsdb.TimeSeries{
  184. Tags: make(map[string]string),
  185. }
  186. for k, v := range props {
  187. newSeries.Tags[k] = v
  188. }
  189. newSeries.Tags["metric"] = statName
  190. newSeries.Tags["field"] = metric.Field
  191. for _, v := range buckets {
  192. bucket := simplejson.NewFromAny(v)
  193. key := castToNullFloat(bucket.Get("key"))
  194. var value null.Float
  195. if statName == "std_deviation_bounds_upper" {
  196. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  197. } else if statName == "std_deviation_bounds_lower" {
  198. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  199. } else {
  200. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  201. }
  202. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  203. }
  204. *series = append(*series, &newSeries)
  205. }
  206. default:
  207. newSeries := tsdb.TimeSeries{
  208. Tags: make(map[string]string),
  209. }
  210. for k, v := range props {
  211. newSeries.Tags[k] = v
  212. }
  213. newSeries.Tags["metric"] = metric.Type
  214. newSeries.Tags["field"] = metric.Field
  215. for _, v := range esAgg.Get("buckets").MustArray() {
  216. bucket := simplejson.NewFromAny(v)
  217. key := castToNullFloat(bucket.Get("key"))
  218. valueObj, err := bucket.Get(metric.ID).Map()
  219. if err != nil {
  220. continue
  221. }
  222. var value null.Float
  223. if _, ok := valueObj["normalized_value"]; ok {
  224. value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value"))
  225. } else {
  226. value = castToNullFloat(bucket.GetPath(metric.ID, "value"))
  227. }
  228. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  229. }
  230. *series = append(*series, &newSeries)
  231. }
  232. }
  233. return nil
  234. }
  235. func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, table *tsdb.Table, props map[string]string) error {
  236. propKeys := make([]string, 0)
  237. for k := range props {
  238. propKeys = append(propKeys, k)
  239. }
  240. sort.Strings(propKeys)
  241. if len(table.Columns) == 0 {
  242. for _, propKey := range propKeys {
  243. table.Columns = append(table.Columns, tsdb.TableColumn{Text: propKey})
  244. }
  245. table.Columns = append(table.Columns, tsdb.TableColumn{Text: aggDef.Field})
  246. }
  247. addMetricValue := func(values *tsdb.RowValues, metricName string, value null.Float) {
  248. found := false
  249. for _, c := range table.Columns {
  250. if c.Text == metricName {
  251. found = true
  252. break
  253. }
  254. }
  255. if !found {
  256. table.Columns = append(table.Columns, tsdb.TableColumn{Text: metricName})
  257. }
  258. *values = append(*values, value)
  259. }
  260. for _, v := range esAgg.Get("buckets").MustArray() {
  261. bucket := simplejson.NewFromAny(v)
  262. values := make(tsdb.RowValues, 0)
  263. for _, propKey := range propKeys {
  264. values = append(values, props[propKey])
  265. }
  266. if key, err := bucket.Get("key").String(); err == nil {
  267. values = append(values, key)
  268. } else {
  269. values = append(values, castToNullFloat(bucket.Get("key")))
  270. }
  271. for _, metric := range target.Metrics {
  272. switch metric.Type {
  273. case "count":
  274. addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count")))
  275. case "extended_stats":
  276. metaKeys := make([]string, 0)
  277. meta := metric.Meta.MustMap()
  278. for k := range meta {
  279. metaKeys = append(metaKeys, k)
  280. }
  281. sort.Strings(metaKeys)
  282. for _, statName := range metaKeys {
  283. v := meta[statName]
  284. if enabled, ok := v.(bool); !ok || !enabled {
  285. continue
  286. }
  287. var value null.Float
  288. if statName == "std_deviation_bounds_upper" {
  289. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  290. } else if statName == "std_deviation_bounds_lower" {
  291. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  292. } else {
  293. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  294. }
  295. addMetricValue(&values, rp.getMetricName(metric.Type), value)
  296. break
  297. }
  298. default:
  299. metricName := rp.getMetricName(metric.Type)
  300. otherMetrics := make([]*MetricAgg, 0)
  301. for _, m := range target.Metrics {
  302. if m.Type == metric.Type {
  303. otherMetrics = append(otherMetrics, m)
  304. }
  305. }
  306. if len(otherMetrics) > 1 {
  307. metricName += " " + metric.Field
  308. }
  309. addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value")))
  310. }
  311. }
  312. table.Rows = append(table.Rows, values)
  313. }
  314. return nil
  315. }
  316. func (rp *responseParser) trimDatapoints(series *tsdb.TimeSeriesSlice, target *Query) {
  317. var histogram *BucketAgg
  318. for _, bucketAgg := range target.BucketAggs {
  319. if bucketAgg.Type == "date_histogram" {
  320. histogram = bucketAgg
  321. break
  322. }
  323. }
  324. if histogram == nil {
  325. return
  326. }
  327. trimEdges, err := histogram.Settings.Get("trimEdges").Int()
  328. if err != nil {
  329. return
  330. }
  331. for _, s := range *series {
  332. if len(s.Points) > trimEdges*2 {
  333. s.Points = s.Points[trimEdges : len(s.Points)-trimEdges]
  334. }
  335. }
  336. }
  337. func (rp *responseParser) nameSeries(seriesList *tsdb.TimeSeriesSlice, target *Query) {
  338. set := make(map[string]string)
  339. for _, v := range *seriesList {
  340. if metricType, exists := v.Tags["metric"]; exists {
  341. if _, ok := set[metricType]; !ok {
  342. set[metricType] = ""
  343. }
  344. }
  345. }
  346. metricTypeCount := len(set)
  347. for _, series := range *seriesList {
  348. series.Name = rp.getSeriesName(series, target, metricTypeCount)
  349. }
  350. }
  351. var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
  352. func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
  353. metricType := series.Tags["metric"]
  354. metricName := rp.getMetricName(metricType)
  355. delete(series.Tags, "metric")
  356. field := ""
  357. if v, ok := series.Tags["field"]; ok {
  358. field = v
  359. delete(series.Tags, "field")
  360. }
  361. if target.Alias != "" {
  362. seriesName := target.Alias
  363. subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
  364. for _, subMatch := range subMatches {
  365. group := subMatch[0]
  366. if len(subMatch) > 1 {
  367. group = subMatch[1]
  368. }
  369. if strings.Index(group, "term ") == 0 {
  370. seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1)
  371. }
  372. if v, ok := series.Tags[group]; ok {
  373. seriesName = strings.Replace(seriesName, subMatch[0], v, 1)
  374. }
  375. if group == "metric" {
  376. seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1)
  377. }
  378. if group == "field" {
  379. seriesName = strings.Replace(seriesName, subMatch[0], field, 1)
  380. }
  381. }
  382. return seriesName
  383. }
  384. // todo, if field and pipelineAgg
  385. if field != "" && isPipelineAgg(metricType) {
  386. found := false
  387. for _, metric := range target.Metrics {
  388. if metric.ID == field {
  389. metricName += " " + describeMetric(metric.Type, field)
  390. found = true
  391. }
  392. }
  393. if !found {
  394. metricName = "Unset"
  395. }
  396. } else if field != "" {
  397. metricName += " " + field
  398. }
  399. if len(series.Tags) == 0 {
  400. return metricName
  401. }
  402. name := ""
  403. for _, v := range series.Tags {
  404. name += v + " "
  405. }
  406. if metricTypeCount == 1 {
  407. return strings.TrimSpace(name)
  408. }
  409. return strings.TrimSpace(name) + " " + metricName
  410. }
  411. func (rp *responseParser) getMetricName(metric string) string {
  412. if text, ok := metricAggType[metric]; ok {
  413. return text
  414. }
  415. if text, ok := extendedStats[metric]; ok {
  416. return text
  417. }
  418. return metric
  419. }
  420. func castToNullFloat(j *simplejson.Json) null.Float {
  421. f, err := j.Float64()
  422. if err == nil {
  423. return null.FloatFrom(f)
  424. }
  425. if s, err := j.String(); err == nil {
  426. if strings.ToLower(s) == "nan" {
  427. return null.NewFloat(0, false)
  428. }
  429. if v, err := strconv.ParseFloat(s, 64); err == nil {
  430. return null.FloatFromPtr(&v)
  431. }
  432. }
  433. return null.NewFloat(0, false)
  434. }
  435. func findAgg(target *Query, aggID string) (*BucketAgg, error) {
  436. for _, v := range target.BucketAggs {
  437. if aggID == v.ID {
  438. return v, nil
  439. }
  440. }
  441. return nil, errors.New("can't found aggDef, aggID:" + aggID)
  442. }
  443. func getErrorFromElasticResponse(response *es.SearchResponse) *tsdb.QueryResult {
  444. result := tsdb.NewQueryResult()
  445. json := simplejson.NewFromAny(response.Error)
  446. reason := json.Get("reason").MustString()
  447. rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
  448. if rootCauseReason != "" {
  449. result.ErrorString = rootCauseReason
  450. } else if reason != "" {
  451. result.ErrorString = reason
  452. } else {
  453. result.ErrorString = "Unkown elasticsearch error response"
  454. }
  455. return result
  456. }