response_parser.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. package elasticsearch
  2. import (
  3. "errors"
  4. "regexp"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/grafana/grafana/pkg/components/null"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  12. )
  13. type responseParser struct {
  14. Responses []*es.SearchResponse
  15. Targets []*Query
  16. }
  17. var newResponseParser = func(responses []*es.SearchResponse, targets []*Query) *responseParser {
  18. return &responseParser{
  19. Responses: responses,
  20. Targets: targets,
  21. }
  22. }
  23. func (rp *responseParser) getTimeSeries() (*tsdb.Response, error) {
  24. result := &tsdb.Response{}
  25. result.Results = make(map[string]*tsdb.QueryResult)
  26. if rp.Responses == nil {
  27. return result, nil
  28. }
  29. for i, res := range rp.Responses {
  30. target := rp.Targets[i]
  31. if res.Error != nil {
  32. result.Results[target.RefID] = getErrorFromElasticResponse(res)
  33. continue
  34. }
  35. queryRes := tsdb.NewQueryResult()
  36. props := make(map[string]string)
  37. table := tsdb.Table{
  38. Columns: make([]tsdb.TableColumn, 0),
  39. Rows: make([]tsdb.RowValues, 0),
  40. }
  41. err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
  42. if err != nil {
  43. return nil, err
  44. }
  45. rp.nameSeries(&queryRes.Series, target)
  46. rp.trimDatapoints(&queryRes.Series, target)
  47. if len(table.Rows) > 0 {
  48. queryRes.Tables = append(queryRes.Tables, &table)
  49. }
  50. result.Results[target.RefID] = queryRes
  51. }
  52. return result, nil
  53. }
  54. func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, series *tsdb.TimeSeriesSlice, table *tsdb.Table, props map[string]string, depth int) error {
  55. var err error
  56. maxDepth := len(target.BucketAggs) - 1
  57. aggIDs := make([]string, 0)
  58. for k := range aggs {
  59. aggIDs = append(aggIDs, k)
  60. }
  61. sort.Strings(aggIDs)
  62. for _, aggID := range aggIDs {
  63. v := aggs[aggID]
  64. aggDef, _ := findAgg(target, aggID)
  65. esAgg := simplejson.NewFromAny(v)
  66. if aggDef == nil {
  67. continue
  68. }
  69. if depth == maxDepth {
  70. if aggDef.Type == "date_histogram" {
  71. err = rp.processMetrics(esAgg, target, series, props)
  72. } else {
  73. err = rp.processAggregationDocs(esAgg, aggDef, target, table, props)
  74. }
  75. if err != nil {
  76. return err
  77. }
  78. } else {
  79. for _, b := range esAgg.Get("buckets").MustArray() {
  80. bucket := simplejson.NewFromAny(b)
  81. newProps := make(map[string]string, 0)
  82. for k, v := range props {
  83. newProps[k] = v
  84. }
  85. if key, err := bucket.Get("key").String(); err == nil {
  86. newProps[aggDef.Field] = key
  87. } else if key, err := bucket.Get("key").Int64(); err == nil {
  88. newProps[aggDef.Field] = strconv.FormatInt(key, 10)
  89. }
  90. if key, err := bucket.Get("key_as_string").String(); err == nil {
  91. newProps[aggDef.Field] = key
  92. }
  93. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  94. if err != nil {
  95. return err
  96. }
  97. }
  98. for k, v := range esAgg.Get("buckets").MustMap() {
  99. bucket := simplejson.NewFromAny(v)
  100. newProps := make(map[string]string, 0)
  101. for k, v := range props {
  102. newProps[k] = v
  103. }
  104. newProps["filter"] = k
  105. err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1)
  106. if err != nil {
  107. return err
  108. }
  109. }
  110. }
  111. }
  112. return nil
  113. }
  114. func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *tsdb.TimeSeriesSlice, props map[string]string) error {
  115. for _, metric := range target.Metrics {
  116. if metric.Hide {
  117. continue
  118. }
  119. switch metric.Type {
  120. case "count":
  121. newSeries := tsdb.TimeSeries{
  122. Tags: make(map[string]string),
  123. }
  124. for _, v := range esAgg.Get("buckets").MustArray() {
  125. bucket := simplejson.NewFromAny(v)
  126. value := castToNullFloat(bucket.Get("doc_count"))
  127. key := castToNullFloat(bucket.Get("key"))
  128. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  129. }
  130. for k, v := range props {
  131. newSeries.Tags[k] = v
  132. }
  133. newSeries.Tags["metric"] = "count"
  134. *series = append(*series, &newSeries)
  135. case "percentiles":
  136. buckets := esAgg.Get("buckets").MustArray()
  137. if len(buckets) == 0 {
  138. break
  139. }
  140. firstBucket := simplejson.NewFromAny(buckets[0])
  141. percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
  142. percentileKeys := make([]string, 0)
  143. for k := range percentiles {
  144. percentileKeys = append(percentileKeys, k)
  145. }
  146. sort.Strings(percentileKeys)
  147. for _, percentileName := range percentileKeys {
  148. newSeries := tsdb.TimeSeries{
  149. Tags: make(map[string]string),
  150. }
  151. for k, v := range props {
  152. newSeries.Tags[k] = v
  153. }
  154. newSeries.Tags["metric"] = "p" + percentileName
  155. newSeries.Tags["field"] = metric.Field
  156. for _, v := range buckets {
  157. bucket := simplejson.NewFromAny(v)
  158. value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName))
  159. key := castToNullFloat(bucket.Get("key"))
  160. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  161. }
  162. *series = append(*series, &newSeries)
  163. }
  164. case "extended_stats":
  165. buckets := esAgg.Get("buckets").MustArray()
  166. metaKeys := make([]string, 0)
  167. meta := metric.Meta.MustMap()
  168. for k := range meta {
  169. metaKeys = append(metaKeys, k)
  170. }
  171. sort.Strings(metaKeys)
  172. for _, statName := range metaKeys {
  173. v := meta[statName]
  174. if enabled, ok := v.(bool); !ok || !enabled {
  175. continue
  176. }
  177. newSeries := tsdb.TimeSeries{
  178. Tags: make(map[string]string),
  179. }
  180. for k, v := range props {
  181. newSeries.Tags[k] = v
  182. }
  183. newSeries.Tags["metric"] = statName
  184. newSeries.Tags["field"] = metric.Field
  185. for _, v := range buckets {
  186. bucket := simplejson.NewFromAny(v)
  187. key := castToNullFloat(bucket.Get("key"))
  188. var value null.Float
  189. if statName == "std_deviation_bounds_upper" {
  190. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  191. } else if statName == "std_deviation_bounds_lower" {
  192. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  193. } else {
  194. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  195. }
  196. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  197. }
  198. *series = append(*series, &newSeries)
  199. }
  200. default:
  201. newSeries := tsdb.TimeSeries{
  202. Tags: make(map[string]string),
  203. }
  204. for k, v := range props {
  205. newSeries.Tags[k] = v
  206. }
  207. newSeries.Tags["metric"] = metric.Type
  208. newSeries.Tags["field"] = metric.Field
  209. for _, v := range esAgg.Get("buckets").MustArray() {
  210. bucket := simplejson.NewFromAny(v)
  211. key := castToNullFloat(bucket.Get("key"))
  212. valueObj, err := bucket.Get(metric.ID).Map()
  213. if err != nil {
  214. continue
  215. }
  216. var value null.Float
  217. if _, ok := valueObj["normalized_value"]; ok {
  218. value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value"))
  219. } else {
  220. value = castToNullFloat(bucket.GetPath(metric.ID, "value"))
  221. }
  222. newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
  223. }
  224. *series = append(*series, &newSeries)
  225. }
  226. }
  227. return nil
  228. }
  229. func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, table *tsdb.Table, props map[string]string) error {
  230. propKeys := make([]string, 0)
  231. for k := range props {
  232. propKeys = append(propKeys, k)
  233. }
  234. sort.Strings(propKeys)
  235. if len(table.Columns) == 0 {
  236. for _, propKey := range propKeys {
  237. table.Columns = append(table.Columns, tsdb.TableColumn{Text: propKey})
  238. }
  239. table.Columns = append(table.Columns, tsdb.TableColumn{Text: aggDef.Field})
  240. }
  241. addMetricValue := func(values *tsdb.RowValues, metricName string, value null.Float) {
  242. found := false
  243. for _, c := range table.Columns {
  244. if c.Text == metricName {
  245. found = true
  246. break
  247. }
  248. }
  249. if !found {
  250. table.Columns = append(table.Columns, tsdb.TableColumn{Text: metricName})
  251. }
  252. *values = append(*values, value)
  253. }
  254. for _, v := range esAgg.Get("buckets").MustArray() {
  255. bucket := simplejson.NewFromAny(v)
  256. values := make(tsdb.RowValues, 0)
  257. for _, propKey := range propKeys {
  258. values = append(values, props[propKey])
  259. }
  260. if key, err := bucket.Get("key").String(); err == nil {
  261. values = append(values, key)
  262. } else {
  263. values = append(values, castToNullFloat(bucket.Get("key")))
  264. }
  265. for _, metric := range target.Metrics {
  266. switch metric.Type {
  267. case "count":
  268. addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count")))
  269. break
  270. case "extended_stats":
  271. metaKeys := make([]string, 0)
  272. meta := metric.Meta.MustMap()
  273. for k := range meta {
  274. metaKeys = append(metaKeys, k)
  275. }
  276. sort.Strings(metaKeys)
  277. for _, statName := range metaKeys {
  278. v := meta[statName]
  279. if enabled, ok := v.(bool); !ok || !enabled {
  280. continue
  281. }
  282. var value null.Float
  283. if statName == "std_deviation_bounds_upper" {
  284. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
  285. } else if statName == "std_deviation_bounds_lower" {
  286. value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
  287. } else {
  288. value = castToNullFloat(bucket.GetPath(metric.ID, statName))
  289. }
  290. addMetricValue(&values, rp.getMetricName(metric.Type), value)
  291. break
  292. }
  293. default:
  294. metricName := rp.getMetricName(metric.Type)
  295. otherMetrics := make([]*MetricAgg, 0)
  296. for _, m := range target.Metrics {
  297. if m.Type == metric.Type {
  298. otherMetrics = append(otherMetrics, m)
  299. }
  300. }
  301. if len(otherMetrics) > 1 {
  302. metricName += " " + metric.Field
  303. }
  304. addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value")))
  305. break
  306. }
  307. }
  308. table.Rows = append(table.Rows, values)
  309. }
  310. return nil
  311. }
  312. func (rp *responseParser) trimDatapoints(series *tsdb.TimeSeriesSlice, target *Query) {
  313. var histogram *BucketAgg
  314. for _, bucketAgg := range target.BucketAggs {
  315. if bucketAgg.Type == "date_histogram" {
  316. histogram = bucketAgg
  317. break
  318. }
  319. }
  320. if histogram == nil {
  321. return
  322. }
  323. trimEdges, err := histogram.Settings.Get("trimEdges").Int()
  324. if err != nil {
  325. return
  326. }
  327. for _, s := range *series {
  328. if len(s.Points) > trimEdges*2 {
  329. s.Points = s.Points[trimEdges : len(s.Points)-trimEdges]
  330. }
  331. }
  332. }
  333. func (rp *responseParser) nameSeries(seriesList *tsdb.TimeSeriesSlice, target *Query) {
  334. set := make(map[string]string)
  335. for _, v := range *seriesList {
  336. if metricType, exists := v.Tags["metric"]; exists {
  337. if _, ok := set[metricType]; !ok {
  338. set[metricType] = ""
  339. }
  340. }
  341. }
  342. metricTypeCount := len(set)
  343. for _, series := range *seriesList {
  344. series.Name = rp.getSeriesName(series, target, metricTypeCount)
  345. }
  346. }
  347. var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
  348. func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string {
  349. metricType := series.Tags["metric"]
  350. metricName := rp.getMetricName(metricType)
  351. delete(series.Tags, "metric")
  352. field := ""
  353. if v, ok := series.Tags["field"]; ok {
  354. field = v
  355. delete(series.Tags, "field")
  356. }
  357. if target.Alias != "" {
  358. seriesName := target.Alias
  359. subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
  360. for _, subMatch := range subMatches {
  361. group := subMatch[0]
  362. if len(subMatch) > 1 {
  363. group = subMatch[1]
  364. }
  365. if strings.Index(group, "term ") == 0 {
  366. seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1)
  367. }
  368. if v, ok := series.Tags[group]; ok {
  369. seriesName = strings.Replace(seriesName, subMatch[0], v, 1)
  370. }
  371. if group == "metric" {
  372. seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1)
  373. }
  374. if group == "field" {
  375. seriesName = strings.Replace(seriesName, subMatch[0], field, 1)
  376. }
  377. }
  378. return seriesName
  379. }
  380. // todo, if field and pipelineAgg
  381. if field != "" && isPipelineAgg(metricType) {
  382. found := false
  383. for _, metric := range target.Metrics {
  384. if metric.ID == field {
  385. metricName += " " + describeMetric(metric.Type, field)
  386. found = true
  387. }
  388. }
  389. if !found {
  390. metricName = "Unset"
  391. }
  392. } else if field != "" {
  393. metricName += " " + field
  394. }
  395. if len(series.Tags) == 0 {
  396. return metricName
  397. }
  398. name := ""
  399. for _, v := range series.Tags {
  400. name += v + " "
  401. }
  402. if metricTypeCount == 1 {
  403. return strings.TrimSpace(name)
  404. }
  405. return strings.TrimSpace(name) + " " + metricName
  406. }
  407. func (rp *responseParser) getMetricName(metric string) string {
  408. if text, ok := metricAggType[metric]; ok {
  409. return text
  410. }
  411. if text, ok := extendedStats[metric]; ok {
  412. return text
  413. }
  414. return metric
  415. }
  416. func castToNullFloat(j *simplejson.Json) null.Float {
  417. f, err := j.Float64()
  418. if err == nil {
  419. return null.FloatFrom(f)
  420. }
  421. if s, err := j.String(); err == nil {
  422. if strings.ToLower(s) == "nan" {
  423. return null.NewFloat(0, false)
  424. }
  425. if v, err := strconv.ParseFloat(s, 64); err == nil {
  426. return null.FloatFromPtr(&v)
  427. }
  428. }
  429. return null.NewFloat(0, false)
  430. }
  431. func findAgg(target *Query, aggID string) (*BucketAgg, error) {
  432. for _, v := range target.BucketAggs {
  433. if aggID == v.ID {
  434. return v, nil
  435. }
  436. }
  437. return nil, errors.New("can't found aggDef, aggID:" + aggID)
  438. }
  439. func getErrorFromElasticResponse(response *es.SearchResponse) *tsdb.QueryResult {
  440. result := tsdb.NewQueryResult()
  441. json := simplejson.NewFromAny(response.Error)
  442. reason := json.Get("reason").MustString()
  443. rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
  444. if rootCauseReason != "" {
  445. result.ErrorString = rootCauseReason
  446. } else if reason != "" {
  447. result.ErrorString = reason
  448. } else {
  449. result.ErrorString = "Unkown elasticsearch error response"
  450. }
  451. return result
  452. }