response_parser.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package mqe
  2. import (
  3. "encoding/json"
  4. "io/ioutil"
  5. "net/http"
  6. "time"
  7. null "gopkg.in/guregu/null.v3"
  8. "fmt"
  9. "github.com/grafana/grafana/pkg/log"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. )
  12. // wildcard as alias
  13. // add host to alias
  14. // add app to alias
  15. // regular alias
  16. func NewResponseParser() *MQEResponseParser {
  17. return &MQEResponseParser{
  18. log: log.New("tsdb.mqe"),
  19. }
  20. }
  21. type MQEResponse struct {
  22. Success bool `json:"success"`
  23. Name string `json:"name"`
  24. Body []MQEResponseSerie `json:"body"`
  25. }
  26. type ResponseTimeRange struct {
  27. Start int64 `json:"start"`
  28. End int64 `json:"end"`
  29. Resolution time.Duration `json:"Resolution"`
  30. }
  31. type MQEResponseSerie struct {
  32. Query string `json:"query"`
  33. Name string `json:"name"`
  34. Type string `json:"type"`
  35. Series []MQESerie `json:"series"`
  36. TimeRange ResponseTimeRange `json:"timerange"`
  37. }
  38. type MQESerie struct {
  39. Values []null.Float `json:"values"`
  40. Tagset map[string]string `json:"tagset"`
  41. }
  42. type MQEResponseParser struct {
  43. log log.Logger
  44. }
  45. func (parser *MQEResponseParser) Parse(res *http.Response) (*tsdb.QueryResult, error) {
  46. body, err := ioutil.ReadAll(res.Body)
  47. defer res.Body.Close()
  48. if err != nil {
  49. return nil, err
  50. }
  51. if res.StatusCode/100 != 2 {
  52. parser.log.Error("Request failed", "status code", res.StatusCode, "body", string(body))
  53. return nil, fmt.Errorf("Returned invalid statuscode")
  54. }
  55. var data *MQEResponse = &MQEResponse{}
  56. err = json.Unmarshal(body, data)
  57. if err != nil {
  58. parser.log.Info("Failed to unmarshal graphite response", "error", err, "status", res.Status, "body", string(body))
  59. return nil, err
  60. }
  61. if !data.Success {
  62. return nil, fmt.Errorf("MQE request failed.")
  63. }
  64. var series tsdb.TimeSeriesSlice
  65. for _, v := range data.Body {
  66. for _, k := range v.Series {
  67. serie := &tsdb.TimeSeries{
  68. Name: v.Name,
  69. }
  70. startTime := time.Unix(v.TimeRange.Start*1000, 0)
  71. for i, l := range k.Values {
  72. timestamp := startTime.Add(time.Duration(int64(v.TimeRange.Resolution) * int64(i)))
  73. serie.Points = append(serie.Points, tsdb.NewTimePoint(l, float64(timestamp.UnixNano()/int64(time.Millisecond))))
  74. }
  75. series = append(series, serie)
  76. }
  77. }
  78. return &tsdb.QueryResult{Series: series}, nil
  79. }