influxdb.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package influxdb
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "net/url"
  8. "path"
  9. "golang.org/x/net/context/ctxhttp"
  10. "github.com/grafana/grafana/pkg/log"
  11. "github.com/grafana/grafana/pkg/models"
  12. "github.com/grafana/grafana/pkg/setting"
  13. "github.com/grafana/grafana/pkg/tsdb"
  14. )
  15. type InfluxDBExecutor struct {
  16. //*models.DataSource
  17. QueryParser *InfluxdbQueryParser
  18. ResponseParser *ResponseParser
  19. //HttpClient *http.Client
  20. }
  21. func NewInfluxDBExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  22. return &InfluxDBExecutor{
  23. QueryParser: &InfluxdbQueryParser{},
  24. ResponseParser: &ResponseParser{},
  25. }, nil
  26. }
  27. var (
  28. glog log.Logger
  29. )
  30. func init() {
  31. glog = log.New("tsdb.influxdb")
  32. tsdb.RegisterTsdbQueryEndpoint("influxdb", NewInfluxDBExecutor)
  33. }
  34. func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, context *tsdb.TsdbQuery) *tsdb.BatchResult {
  35. result := &tsdb.BatchResult{}
  36. query, err := e.getQuery(dsInfo, context.Queries, context)
  37. if err != nil {
  38. return result.WithError(err)
  39. }
  40. rawQuery, err := query.Build(context)
  41. if err != nil {
  42. return result.WithError(err)
  43. }
  44. if setting.Env == setting.DEV {
  45. glog.Debug("Influxdb query", "raw query", rawQuery)
  46. }
  47. req, err := e.createRequest(dsInfo, rawQuery)
  48. if err != nil {
  49. return result.WithError(err)
  50. }
  51. httpClient, err := dsInfo.GetHttpClient()
  52. if err != nil {
  53. return result.WithError(err)
  54. }
  55. resp, err := ctxhttp.Do(ctx, httpClient, req)
  56. if err != nil {
  57. return result.WithError(err)
  58. }
  59. if resp.StatusCode/100 != 2 {
  60. return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status))
  61. }
  62. var response Response
  63. dec := json.NewDecoder(resp.Body)
  64. defer resp.Body.Close()
  65. dec.UseNumber()
  66. err = dec.Decode(&response)
  67. if err != nil {
  68. return result.WithError(err)
  69. }
  70. if response.Err != nil {
  71. return result.WithError(response.Err)
  72. }
  73. result.QueryResults = make(map[string]*tsdb.QueryResult)
  74. result.QueryResults["A"] = e.ResponseParser.Parse(&response, query)
  75. return result
  76. }
  77. func (e *InfluxDBExecutor) getQuery(dsInfo *models.DataSource, queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) {
  78. for _, v := range queries {
  79. query, err := e.QueryParser.Parse(v.Model, dsInfo)
  80. if err != nil {
  81. return nil, err
  82. }
  83. return query, nil
  84. }
  85. return nil, fmt.Errorf("query request contains no queries")
  86. }
  87. func (e *InfluxDBExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
  88. u, _ := url.Parse(dsInfo.Url)
  89. u.Path = path.Join(u.Path, "query")
  90. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  91. if err != nil {
  92. return nil, err
  93. }
  94. params := req.URL.Query()
  95. params.Set("q", query)
  96. params.Set("db", dsInfo.Database)
  97. params.Set("epoch", "s")
  98. req.URL.RawQuery = params.Encode()
  99. req.Header.Set("User-Agent", "Grafana")
  100. if dsInfo.BasicAuth {
  101. req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.BasicAuthPassword)
  102. }
  103. if !dsInfo.BasicAuth && dsInfo.User != "" {
  104. req.SetBasicAuth(dsInfo.User, dsInfo.Password)
  105. }
  106. glog.Debug("Influxdb request", "url", req.URL.String())
  107. return req, nil
  108. }