response_parser.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package influxdb
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "github.com/grafana/grafana/pkg/tsdb"
  8. "gopkg.in/guregu/null.v3"
  9. )
  10. type ResponseParser struct{}
  11. var (
  12. legendFormat *regexp.Regexp
  13. )
  14. func init() {
  15. legendFormat = regexp.MustCompile(`\[\[(\w+?)*\]\]*|\$\s*(\w+?)*`)
  16. }
  17. func (rp *ResponseParser) Parse(response *Response, query *Query) *tsdb.QueryResult {
  18. queryRes := tsdb.NewQueryResult()
  19. for _, result := range response.Results {
  20. queryRes.Series = append(queryRes.Series, rp.transformRows(result.Series, queryRes, query)...)
  21. }
  22. return queryRes
  23. }
  24. func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult, query *Query) tsdb.TimeSeriesSlice {
  25. var result tsdb.TimeSeriesSlice
  26. for _, row := range rows {
  27. for columnIndex, column := range row.Columns {
  28. if column == "time" {
  29. continue
  30. }
  31. var points tsdb.TimeSeriesPoints
  32. for _, valuePair := range row.Values {
  33. point, err := rp.parseTimepoint(valuePair, columnIndex)
  34. if err == nil {
  35. points = append(points, point)
  36. }
  37. }
  38. result = append(result, &tsdb.TimeSeries{
  39. Name: rp.formatSerieName(row, column, query),
  40. Points: points,
  41. })
  42. }
  43. }
  44. return result
  45. }
  46. func (rp *ResponseParser) formatSerieName(row Row, column string, query *Query) string {
  47. if query.Alias == "" {
  48. return rp.buildSerieNameFromQuery(row, column)
  49. }
  50. result := legendFormat.ReplaceAllFunc([]byte(query.Alias), func(in []byte) []byte {
  51. aliasFormat := string(in)
  52. aliasFormat = strings.Replace(aliasFormat, "[[", "", 1)
  53. aliasFormat = strings.Replace(aliasFormat, "]]", "", 1)
  54. aliasFormat = strings.Replace(aliasFormat, "$", "", 1)
  55. if aliasFormat == "m" || aliasFormat == "measurement" {
  56. return []byte(query.Measurement)
  57. }
  58. if aliasFormat == "col" {
  59. return []byte(column)
  60. }
  61. if !strings.HasPrefix(aliasFormat, "tag_") {
  62. return in
  63. }
  64. tagKey := strings.Replace(aliasFormat, "tag_", "", 1)
  65. tagValue, exist := row.Tags[tagKey]
  66. if exist {
  67. return []byte(tagValue)
  68. }
  69. return in
  70. })
  71. return string(result)
  72. }
  73. func (rp *ResponseParser) buildSerieNameFromQuery(row Row, column string) string {
  74. var tags []string
  75. for k, v := range row.Tags {
  76. tags = append(tags, fmt.Sprintf("%s: %s", k, v))
  77. }
  78. tagText := ""
  79. if len(tags) > 0 {
  80. tagText = fmt.Sprintf(" { %s }", strings.Join(tags, " "))
  81. }
  82. return fmt.Sprintf("%s.%s%s", row.Name, column, tagText)
  83. }
  84. func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (tsdb.TimePoint, error) {
  85. var value null.Float = rp.parseValue(valuePair[valuePosition])
  86. timestampNumber, _ := valuePair[0].(json.Number)
  87. timestamp, err := timestampNumber.Float64()
  88. if err != nil {
  89. return tsdb.TimePoint{}, err
  90. }
  91. return tsdb.NewTimePoint(value, timestamp), nil
  92. }
  93. func (rp *ResponseParser) parseValue(value interface{}) null.Float {
  94. number, ok := value.(json.Number)
  95. if !ok {
  96. return null.FloatFromPtr(nil)
  97. }
  98. fvalue, err := number.Float64()
  99. if err == nil {
  100. return null.FloatFrom(fvalue)
  101. }
  102. ivalue, err := number.Int64()
  103. if err == nil {
  104. return null.FloatFrom(float64(ivalue))
  105. }
  106. return null.FloatFromPtr(nil)
  107. }