client.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/grafana/grafana/pkg/components/simplejson"
  14. "github.com/grafana/grafana/pkg/log"
  15. "github.com/grafana/grafana/pkg/tsdb"
  16. "github.com/grafana/grafana/pkg/models"
  17. "golang.org/x/net/context/ctxhttp"
  18. )
  19. const loggerName = "tsdb.elasticsearch.client"
  20. var (
  21. clientLog = log.New(loggerName)
  22. )
  23. var newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
  24. return ds.GetHttpClient()
  25. }
  26. // Client represents a client which can interact with elasticsearch api
  27. type Client interface {
  28. GetVersion() int
  29. GetTimeField() string
  30. GetMinInterval(queryInterval string) (time.Duration, error)
  31. ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
  32. MultiSearch() *MultiSearchRequestBuilder
  33. }
  34. // NewClient creates a new elasticsearch client
  35. var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb.TimeRange) (Client, error) {
  36. version, err := ds.JsonData.Get("esVersion").Int()
  37. if err != nil {
  38. return nil, fmt.Errorf("elasticsearch version is required, err=%v", err)
  39. }
  40. timeField, err := ds.JsonData.Get("timeField").String()
  41. if err != nil {
  42. return nil, fmt.Errorf("elasticsearch time field name is required, err=%v", err)
  43. }
  44. indexInterval := ds.JsonData.Get("interval").MustString()
  45. ip, err := newIndexPattern(indexInterval, ds.Database)
  46. if err != nil {
  47. return nil, err
  48. }
  49. indices, err := ip.GetIndices(timeRange)
  50. if err != nil {
  51. return nil, err
  52. }
  53. clientLog.Debug("Creating new client", "version", version, "timeField", timeField, "indices", strings.Join(indices, ", "))
  54. switch version {
  55. case 2, 5, 56, 60:
  56. return &baseClientImpl{
  57. ctx: ctx,
  58. ds: ds,
  59. version: version,
  60. timeField: timeField,
  61. indices: indices,
  62. timeRange: timeRange,
  63. }, nil
  64. }
  65. return nil, fmt.Errorf("elasticsearch version=%d is not supported", version)
  66. }
  67. type baseClientImpl struct {
  68. ctx context.Context
  69. ds *models.DataSource
  70. version int
  71. timeField string
  72. indices []string
  73. timeRange *tsdb.TimeRange
  74. }
  75. func (c *baseClientImpl) GetVersion() int {
  76. return c.version
  77. }
  78. func (c *baseClientImpl) GetTimeField() string {
  79. return c.timeField
  80. }
  81. func (c *baseClientImpl) GetMinInterval(queryInterval string) (time.Duration, error) {
  82. return tsdb.GetIntervalFrom(c.ds, simplejson.NewFromAny(map[string]interface{}{
  83. "interval": queryInterval,
  84. }), 5*time.Second)
  85. }
  86. func (c *baseClientImpl) getSettings() *simplejson.Json {
  87. return c.ds.JsonData
  88. }
  89. type multiRequest struct {
  90. header map[string]interface{}
  91. body interface{}
  92. interval tsdb.Interval
  93. }
  94. func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) {
  95. bytes, err := c.encodeBatchRequests(requests)
  96. if err != nil {
  97. return nil, err
  98. }
  99. return c.executeRequest(http.MethodPost, uriPath, bytes)
  100. }
  101. func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) {
  102. clientLog.Debug("Encoding batch requests to json", "batch requests", len(requests))
  103. start := time.Now()
  104. payload := bytes.Buffer{}
  105. for _, r := range requests {
  106. reqHeader, err := json.Marshal(r.header)
  107. if err != nil {
  108. return nil, err
  109. }
  110. payload.WriteString(string(reqHeader) + "\n")
  111. reqBody, err := json.Marshal(r.body)
  112. if err != nil {
  113. return nil, err
  114. }
  115. body := string(reqBody)
  116. body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10), -1)
  117. body = strings.Replace(body, "$__interval", r.interval.Text, -1)
  118. payload.WriteString(body + "\n")
  119. }
  120. elapsed := time.Since(start)
  121. clientLog.Debug("Encoded batch requests to json", "took", elapsed)
  122. return payload.Bytes(), nil
  123. }
  124. func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*http.Response, error) {
  125. u, _ := url.Parse(c.ds.Url)
  126. u.Path = path.Join(u.Path, uriPath)
  127. var req *http.Request
  128. var err error
  129. if method == http.MethodPost {
  130. req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(body))
  131. } else {
  132. req, err = http.NewRequest(http.MethodGet, u.String(), nil)
  133. }
  134. if err != nil {
  135. return nil, err
  136. }
  137. clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
  138. req.Header.Set("User-Agent", "Grafana")
  139. req.Header.Set("Content-Type", "application/json")
  140. if c.ds.BasicAuth {
  141. clientLog.Debug("Request configured to use basic authentication")
  142. req.SetBasicAuth(c.ds.BasicAuthUser, c.ds.BasicAuthPassword)
  143. }
  144. if !c.ds.BasicAuth && c.ds.User != "" {
  145. clientLog.Debug("Request configured to use basic authentication")
  146. req.SetBasicAuth(c.ds.User, c.ds.Password)
  147. }
  148. httpClient, err := newDatasourceHttpClient(c.ds)
  149. if err != nil {
  150. return nil, err
  151. }
  152. start := time.Now()
  153. defer func() {
  154. elapsed := time.Since(start)
  155. clientLog.Debug("Executed request", "took", elapsed)
  156. }()
  157. return ctxhttp.Do(c.ctx, httpClient, req)
  158. }
  159. func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
  160. clientLog.Debug("Executing multisearch", "search requests", len(r.Requests))
  161. multiRequests := c.createMultiSearchRequests(r.Requests)
  162. res, err := c.executeBatchRequest("_msearch", multiRequests)
  163. if err != nil {
  164. return nil, err
  165. }
  166. clientLog.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
  167. start := time.Now()
  168. clientLog.Debug("Decoding multisearch json response")
  169. var msr MultiSearchResponse
  170. defer res.Body.Close()
  171. dec := json.NewDecoder(res.Body)
  172. err = dec.Decode(&msr)
  173. if err != nil {
  174. return nil, err
  175. }
  176. elapsed := time.Since(start)
  177. clientLog.Debug("Decoded multisearch json response", "took", elapsed)
  178. msr.Status = res.StatusCode
  179. return &msr, nil
  180. }
  181. func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  182. multiRequests := []*multiRequest{}
  183. for _, searchReq := range searchRequests {
  184. mr := multiRequest{
  185. header: map[string]interface{}{
  186. "search_type": "query_then_fetch",
  187. "ignore_unavailable": true,
  188. "index": strings.Join(c.indices, ","),
  189. },
  190. body: searchReq,
  191. interval: searchReq.Interval,
  192. }
  193. if c.version == 2 {
  194. mr.header["search_type"] = "count"
  195. }
  196. if c.version >= 56 {
  197. maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(256)
  198. mr.header["max_concurrent_shard_requests"] = maxConcurrentShardRequests
  199. }
  200. multiRequests = append(multiRequests, &mr)
  201. }
  202. return multiRequests
  203. }
  204. func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
  205. return NewMultiSearchRequestBuilder(c.GetVersion())
  206. }