|
|
@@ -5,6 +5,7 @@ import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
+ "io/ioutil"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"path"
|
|
|
@@ -37,6 +38,7 @@ type Client interface {
|
|
|
GetMinInterval(queryInterval string) (time.Duration, error)
|
|
|
ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
|
|
|
MultiSearch() *MultiSearchRequestBuilder
|
|
|
+ EnableDebug()
|
|
|
}
|
|
|
|
|
|
// NewClient creates a new elasticsearch client
|
|
|
@@ -80,12 +82,13 @@ var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb
|
|
|
}
|
|
|
|
|
|
type baseClientImpl struct {
|
|
|
- ctx context.Context
|
|
|
- ds *models.DataSource
|
|
|
- version int
|
|
|
- timeField string
|
|
|
- indices []string
|
|
|
- timeRange *tsdb.TimeRange
|
|
|
+ ctx context.Context
|
|
|
+ ds *models.DataSource
|
|
|
+ version int
|
|
|
+ timeField string
|
|
|
+ indices []string
|
|
|
+ timeRange *tsdb.TimeRange
|
|
|
+ debugEnabled bool
|
|
|
}
|
|
|
|
|
|
func (c *baseClientImpl) GetVersion() int {
|
|
|
@@ -112,7 +115,7 @@ type multiRequest struct {
|
|
|
interval tsdb.Interval
|
|
|
}
|
|
|
|
|
|
-func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*http.Response, error) {
|
|
|
+func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*response, error) {
|
|
|
bytes, err := c.encodeBatchRequests(requests)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -150,7 +153,7 @@ func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte,
|
|
|
return payload.Bytes(), nil
|
|
|
}
|
|
|
|
|
|
-func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*http.Response, error) {
|
|
|
+func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*response, error) {
|
|
|
u, _ := url.Parse(c.ds.Url)
|
|
|
u.Path = path.Join(u.Path, uriPath)
|
|
|
u.RawQuery = uriQuery
|
|
|
@@ -168,6 +171,15 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [
|
|
|
|
|
|
clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
|
|
|
|
|
|
+ var reqInfo *SearchRequestInfo
|
|
|
+ if c.debugEnabled {
|
|
|
+ reqInfo = &SearchRequestInfo{
|
|
|
+ Method: req.Method,
|
|
|
+ Url: req.URL.String(),
|
|
|
+ Data: string(body),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
req.Header.Set("User-Agent", "Grafana")
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
|
|
@@ -191,7 +203,11 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [
|
|
|
elapsed := time.Since(start)
|
|
|
clientLog.Debug("Executed request", "took", elapsed)
|
|
|
}()
|
|
|
- return ctxhttp.Do(c.ctx, httpClient, req)
|
|
|
+ res, err := ctxhttp.Do(c.ctx, httpClient, req)
|
|
|
+ return &response{
|
|
|
+ httpResponse: res,
|
|
|
+ reqInfo: reqInfo,
|
|
|
+ }, err
|
|
|
}
|
|
|
|
|
|
func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
|
|
|
@@ -199,18 +215,31 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
|
|
|
|
|
|
multiRequests := c.createMultiSearchRequests(r.Requests)
|
|
|
queryParams := c.getMultiSearchQueryParameters()
|
|
|
- res, err := c.executeBatchRequest("_msearch", queryParams, multiRequests)
|
|
|
+ clientRes, err := c.executeBatchRequest("_msearch", queryParams, multiRequests)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+ res := clientRes.httpResponse
|
|
|
+ defer res.Body.Close()
|
|
|
|
|
|
clientLog.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
|
|
|
|
|
|
start := time.Now()
|
|
|
clientLog.Debug("Decoding multisearch json response")
|
|
|
|
|
|
+ var bodyBytes []byte
|
|
|
+ if c.debugEnabled {
|
|
|
+ tmpBytes, err := ioutil.ReadAll(res.Body)
|
|
|
+ if err != nil {
|
|
|
+ clientLog.Error("failed to read http response bytes", "error", err)
|
|
|
+ } else {
|
|
|
+ bodyBytes = make([]byte, len(tmpBytes))
|
|
|
+ copy(bodyBytes, tmpBytes)
|
|
|
+ res.Body = ioutil.NopCloser(bytes.NewBuffer(tmpBytes))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
var msr MultiSearchResponse
|
|
|
- defer res.Body.Close()
|
|
|
dec := json.NewDecoder(res.Body)
|
|
|
err = dec.Decode(&msr)
|
|
|
if err != nil {
|
|
|
@@ -222,6 +251,24 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
|
|
|
|
|
|
msr.Status = res.StatusCode
|
|
|
|
|
|
+ if c.debugEnabled {
|
|
|
+ bodyJSON, err := simplejson.NewFromReader(bytes.NewBuffer(bodyBytes))
|
|
|
+ var data *simplejson.Json
|
|
|
+ if err != nil {
|
|
|
+ clientLog.Error("failed to decode http response into json", "error", err)
|
|
|
+ } else {
|
|
|
+ data = bodyJSON
|
|
|
+ }
|
|
|
+
|
|
|
+ msr.DebugInfo = &SearchDebugInfo{
|
|
|
+ Request: clientRes.reqInfo,
|
|
|
+ Response: &SearchResponseInfo{
|
|
|
+ Status: res.StatusCode,
|
|
|
+ Data: data,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
return &msr, nil
|
|
|
}
|
|
|
|
|
|
@@ -266,3 +313,7 @@ func (c *baseClientImpl) getMultiSearchQueryParameters() string {
|
|
|
func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
|
|
|
return NewMultiSearchRequestBuilder(c.GetVersion())
|
|
|
}
|
|
|
+
|
|
|
+func (c *baseClientImpl) EnableDebug() {
|
|
|
+ c.debugEnabled = true
|
|
|
+}
|