response_parser.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package mqe
  2. import (
  3. "encoding/json"
  4. "io/ioutil"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "fmt"
  9. "regexp"
  10. "github.com/grafana/grafana/pkg/components/null"
  11. "github.com/grafana/grafana/pkg/log"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. )
  14. func NewResponseParser() *ResponseParser {
  15. return &ResponseParser{
  16. log: log.New("tsdb.mqe"),
  17. }
  18. }
  19. var (
  20. indexAliasPattern *regexp.Regexp
  21. wildcardAliasPattern *regexp.Regexp
  22. )
  23. func init() {
  24. indexAliasPattern = regexp.MustCompile(`\$(\d)`)
  25. wildcardAliasPattern = regexp.MustCompile(`[*!]`)
  26. }
  27. type MQEResponse struct {
  28. Success bool `json:"success"`
  29. Name string `json:"name"`
  30. Body []MQEResponseSerie `json:"body"`
  31. }
  32. type ResponseTimeRange struct {
  33. Start int64 `json:"start"`
  34. End int64 `json:"end"`
  35. Resolution int64 `json:"Resolution"`
  36. }
  37. type MQEResponseSerie struct {
  38. Query string `json:"query"`
  39. Name string `json:"name"`
  40. Type string `json:"type"`
  41. Series []MQESerie `json:"series"`
  42. TimeRange ResponseTimeRange `json:"timerange"`
  43. }
  44. type MQESerie struct {
  45. Values []null.Float `json:"values"`
  46. Tagset map[string]string `json:"tagset"`
  47. }
  48. type ResponseParser struct {
  49. log log.Logger
  50. }
  51. func (parser *ResponseParser) Parse(res *http.Response, queryRef QueryToSend) ([]*tsdb.TimeSeries, error) {
  52. body, err := ioutil.ReadAll(res.Body)
  53. defer res.Body.Close()
  54. if err != nil {
  55. return nil, err
  56. }
  57. if res.StatusCode/100 != 2 {
  58. parser.log.Error("Request failed", "status code", res.StatusCode, "body", string(body))
  59. return nil, fmt.Errorf("Returned invalid statuscode")
  60. }
  61. var data *MQEResponse = &MQEResponse{}
  62. err = json.Unmarshal(body, data)
  63. if err != nil {
  64. parser.log.Info("Failed to unmarshal response", "error", err, "status", res.Status, "body", string(body))
  65. return nil, err
  66. }
  67. if !data.Success {
  68. return nil, fmt.Errorf("Request failed.")
  69. }
  70. var series []*tsdb.TimeSeries
  71. for _, body := range data.Body {
  72. for _, mqeSerie := range body.Series {
  73. serie := &tsdb.TimeSeries{
  74. Tags: map[string]string{},
  75. Name: parser.formatLegend(body, mqeSerie, queryRef),
  76. }
  77. for key, value := range mqeSerie.Tagset {
  78. serie.Tags[key] = value
  79. }
  80. for i, value := range mqeSerie.Values {
  81. timestamp := body.TimeRange.Start + int64(i)*body.TimeRange.Resolution
  82. serie.Points = append(serie.Points, tsdb.NewTimePoint(value, float64(timestamp)))
  83. }
  84. series = append(series, serie)
  85. }
  86. }
  87. return series, nil
  88. }
  89. func (parser *ResponseParser) formatLegend(body MQEResponseSerie, mqeSerie MQESerie, queryToSend QueryToSend) string {
  90. namePrefix := ""
  91. //append predefined tags to seriename
  92. for key, value := range mqeSerie.Tagset {
  93. if key == "cluster" && queryToSend.QueryRef.AddClusterToAlias {
  94. namePrefix += value + " "
  95. }
  96. }
  97. for key, value := range mqeSerie.Tagset {
  98. if key == "host" && queryToSend.QueryRef.AddHostToAlias {
  99. namePrefix += value + " "
  100. }
  101. }
  102. return namePrefix + parser.formatName(body, queryToSend)
  103. }
  104. func (parser *ResponseParser) formatName(body MQEResponseSerie, queryToSend QueryToSend) string {
  105. if indexAliasPattern.MatchString(queryToSend.Metric.Alias) {
  106. return parser.indexAlias(body, queryToSend)
  107. }
  108. if wildcardAliasPattern.MatchString(queryToSend.Metric.Metric) && wildcardAliasPattern.MatchString(queryToSend.Metric.Alias) {
  109. return parser.wildcardAlias(body, queryToSend)
  110. }
  111. return body.Name
  112. }
  113. func (parser *ResponseParser) wildcardAlias(body MQEResponseSerie, queryToSend QueryToSend) string {
  114. regString := strings.Replace(queryToSend.Metric.Metric, `*`, `(.*)`, 1)
  115. reg, err := regexp.Compile(regString)
  116. if err != nil {
  117. return queryToSend.Metric.Alias
  118. }
  119. matches := reg.FindAllStringSubmatch(queryToSend.RawQuery, -1)
  120. if len(matches) == 0 || len(matches[0]) < 2 {
  121. return queryToSend.Metric.Alias
  122. }
  123. return matches[0][1]
  124. }
  125. func (parser *ResponseParser) indexAlias(body MQEResponseSerie, queryToSend QueryToSend) string {
  126. queryNameParts := strings.Split(queryToSend.Metric.Metric, `.`)
  127. name := indexAliasPattern.ReplaceAllStringFunc(queryToSend.Metric.Alias, func(in string) string {
  128. positionName := strings.TrimSpace(strings.Replace(in, "$", "", 1))
  129. pos, err := strconv.Atoi(positionName)
  130. if err != nil {
  131. return ""
  132. }
  133. for i, part := range queryNameParts {
  134. if i == pos-1 {
  135. return strings.TrimSpace(part)
  136. }
  137. }
  138. return ""
  139. })
  140. return strings.Replace(name, " ", ".", -1)
  141. }