influxdb.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package influxdb
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "path"
  10. "strings"
  11. "github.com/grafana/grafana/pkg/infra/log"
  12. "github.com/grafana/grafana/pkg/models"
  13. "github.com/grafana/grafana/pkg/setting"
  14. "github.com/grafana/grafana/pkg/tsdb"
  15. "golang.org/x/net/context/ctxhttp"
  16. )
  17. type InfluxDBExecutor struct {
  18. //*models.DataSource
  19. QueryParser *InfluxdbQueryParser
  20. ResponseParser *ResponseParser
  21. //HttpClient *http.Client
  22. }
  23. func NewInfluxDBExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  24. return &InfluxDBExecutor{
  25. QueryParser: &InfluxdbQueryParser{},
  26. ResponseParser: &ResponseParser{},
  27. }, nil
  28. }
  29. var (
  30. glog log.Logger
  31. )
  32. var ErrInvalidHttpMode error = errors.New("'httpMode' should be either 'GET' or 'POST'")
  33. func init() {
  34. glog = log.New("tsdb.influxdb")
  35. tsdb.RegisterTsdbQueryEndpoint("influxdb", NewInfluxDBExecutor)
  36. }
  37. func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
  38. result := &tsdb.Response{}
  39. query, err := e.getQuery(dsInfo, tsdbQuery.Queries, tsdbQuery)
  40. if err != nil {
  41. return nil, err
  42. }
  43. rawQuery, err := query.Build(tsdbQuery)
  44. if err != nil {
  45. return nil, err
  46. }
  47. if setting.Env == setting.DEV {
  48. glog.Debug("Influxdb query", "raw query", rawQuery)
  49. }
  50. req, err := e.createRequest(dsInfo, rawQuery)
  51. if err != nil {
  52. return nil, err
  53. }
  54. httpClient, err := dsInfo.GetHttpClient()
  55. if err != nil {
  56. return nil, err
  57. }
  58. resp, err := ctxhttp.Do(ctx, httpClient, req)
  59. if err != nil {
  60. return nil, err
  61. }
  62. defer resp.Body.Close()
  63. if resp.StatusCode/100 != 2 {
  64. return nil, fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status)
  65. }
  66. var response Response
  67. dec := json.NewDecoder(resp.Body)
  68. dec.UseNumber()
  69. err = dec.Decode(&response)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if response.Err != nil {
  74. return nil, response.Err
  75. }
  76. result.Results = make(map[string]*tsdb.QueryResult)
  77. result.Results["A"] = e.ResponseParser.Parse(&response, query)
  78. return result, nil
  79. }
  80. func (e *InfluxDBExecutor) getQuery(dsInfo *models.DataSource, queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) {
  81. // The model supports multiple queries, but right now this is only used from
  82. // alerting so we only needed to support batch executing 1 query at a time.
  83. if len(queries) > 0 {
  84. query, err := e.QueryParser.Parse(queries[0].Model, dsInfo)
  85. if err != nil {
  86. return nil, err
  87. }
  88. return query, nil
  89. }
  90. return nil, fmt.Errorf("query request contains no queries")
  91. }
  92. func (e *InfluxDBExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
  93. u, _ := url.Parse(dsInfo.Url)
  94. u.Path = path.Join(u.Path, "query")
  95. httpMode := dsInfo.JsonData.Get("httpMode").MustString("GET")
  96. req, err := func() (*http.Request, error) {
  97. switch httpMode {
  98. case "GET":
  99. return http.NewRequest(http.MethodGet, u.String(), nil)
  100. case "POST":
  101. bodyValues := url.Values{}
  102. bodyValues.Add("q", query)
  103. body := bodyValues.Encode()
  104. return http.NewRequest(http.MethodPost, u.String(), strings.NewReader(body))
  105. default:
  106. return nil, ErrInvalidHttpMode
  107. }
  108. }()
  109. if err != nil {
  110. return nil, err
  111. }
  112. req.Header.Set("User-Agent", "Grafana")
  113. params := req.URL.Query()
  114. params.Set("db", dsInfo.Database)
  115. params.Set("epoch", "s")
  116. if httpMode == "GET" {
  117. params.Set("q", query)
  118. } else if httpMode == "POST" {
  119. req.Header.Set("Content-type", "application/x-www-form-urlencoded")
  120. }
  121. req.URL.RawQuery = params.Encode()
  122. if dsInfo.BasicAuth {
  123. req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.DecryptedBasicAuthPassword())
  124. }
  125. if !dsInfo.BasicAuth && dsInfo.User != "" {
  126. req.SetBasicAuth(dsInfo.User, dsInfo.DecryptedPassword())
  127. }
  128. glog.Debug("Influxdb request", "url", req.URL.String())
  129. return req, nil
  130. }