| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- package influxdb
- import (
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "net/url"
- "path"
- "golang.org/x/net/context/ctxhttp"
- "github.com/grafana/grafana/pkg/log"
- "github.com/grafana/grafana/pkg/tsdb"
- )
- type InfluxDBExecutor struct {
- *tsdb.DataSourceInfo
- QueryParser *InfluxdbQueryParser
- QueryBuilder *QueryBuilder
- ResponseParser *ResponseParser
- }
- func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
- return &InfluxDBExecutor{
- DataSourceInfo: dsInfo,
- QueryParser: &InfluxdbQueryParser{},
- QueryBuilder: &QueryBuilder{},
- ResponseParser: &ResponseParser{},
- }
- }
- var (
- glog log.Logger
- HttpClient *http.Client
- )
- func init() {
- glog = log.New("tsdb.influxdb")
- tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor)
- HttpClient = tsdb.GetDefaultClient()
- }
- func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
- result := &tsdb.BatchResult{}
- query, err := e.getQuery(queries, context)
- if err != nil {
- return result.WithError(err)
- }
- glog.Debug("Influxdb query", "raw query", query)
- req, err := e.createRequest(query)
- if err != nil {
- return result.WithError(err)
- }
- resp, err := ctxhttp.Do(ctx, HttpClient, req)
- if err != nil {
- return result.WithError(err)
- }
- if resp.StatusCode/100 != 2 {
- return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status))
- }
- var response Response
- dec := json.NewDecoder(resp.Body)
- dec.UseNumber()
- err = dec.Decode(&response)
- if err != nil {
- return result.WithError(err)
- }
- result.QueryResults = make(map[string]*tsdb.QueryResult)
- result.QueryResults["A"] = e.ResponseParser.Parse(&response)
- return result
- }
- func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (string, error) {
- for _, v := range queries {
- query, err := e.QueryParser.Parse(v.Model, e.DataSourceInfo)
- if err != nil {
- return "", err
- }
- rawQuery, err := e.QueryBuilder.Build(query, context)
- if err != nil {
- return "", err
- }
- return rawQuery, nil
- }
- return "", fmt.Errorf("query request contains no queries")
- }
- func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) {
- u, _ := url.Parse(e.Url)
- u.Path = path.Join(u.Path, "query")
- req, err := http.NewRequest(http.MethodGet, u.String(), nil)
- if err != nil {
- return nil, err
- }
- params := req.URL.Query()
- params.Set("q", query)
- params.Set("db", e.Database)
- params.Set("epoch", "s")
- req.URL.RawQuery = params.Encode()
- req.Header.Set("User-Agent", "Grafana")
- if e.BasicAuth {
- req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
- }
- if e.User != "" {
- req.SetBasicAuth(e.User, e.Password)
- }
- glog.Debug("Influxdb request", "url", req.URL.String())
- return req, nil
- }
|