client.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "path"
  10. "strings"
  11. "time"
  12. "github.com/grafana/grafana/pkg/components/simplejson"
  13. "github.com/grafana/grafana/pkg/log"
  14. "github.com/grafana/grafana/pkg/tsdb"
  15. "github.com/grafana/grafana/pkg/models"
  16. "golang.org/x/net/context/ctxhttp"
  17. )
  18. const loggerName = "tsdb.elasticsearch.client"
  19. var (
  20. clientLog = log.New(loggerName)
  21. )
  22. // Client represents a client which can interact with elasticsearch api
  23. type Client interface {
  24. GetVersion() int
  25. GetTimeField() string
  26. GetMinInterval(queryInterval string) (time.Duration, error)
  27. ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
  28. MultiSearch() *MultiSearchRequestBuilder
  29. }
  30. // NewClient creates a new elasticsearch client
  31. var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb.TimeRange) (Client, error) {
  32. version, err := ds.JsonData.Get("esVersion").Int()
  33. if err != nil {
  34. return nil, fmt.Errorf("eleasticsearch version is required, err=%v", err)
  35. }
  36. timeField, err := ds.JsonData.Get("timeField").String()
  37. if err != nil {
  38. return nil, fmt.Errorf("eleasticsearch time field name is required, err=%v", err)
  39. }
  40. indexInterval := ds.JsonData.Get("interval").MustString()
  41. ip, err := newIndexPattern(indexInterval, ds.Database)
  42. if err != nil {
  43. return nil, err
  44. }
  45. indices, err := ip.GetIndices(timeRange)
  46. if err != nil {
  47. return nil, err
  48. }
  49. bc := &baseClientImpl{
  50. ctx: ctx,
  51. ds: ds,
  52. version: version,
  53. timeField: timeField,
  54. indices: indices,
  55. }
  56. clientLog.Debug("Creating new client", "version", version, "timeField", timeField, "indices", strings.Join(indices, ", "))
  57. switch version {
  58. case 2:
  59. return newV2Client(bc)
  60. case 5:
  61. return newV5Client(bc)
  62. case 56:
  63. return newV56Client(bc)
  64. }
  65. return nil, fmt.Errorf("elasticsearch version=%d is not supported", version)
  66. }
  67. type baseClient interface {
  68. Client
  69. getSettings() *simplejson.Json
  70. executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error)
  71. executeRequest(method, uriPath string, body []byte) (*http.Response, error)
  72. createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest
  73. }
  74. type baseClientImpl struct {
  75. ctx context.Context
  76. ds *models.DataSource
  77. version int
  78. timeField string
  79. indices []string
  80. }
  81. func (c *baseClientImpl) GetVersion() int {
  82. return c.version
  83. }
  84. func (c *baseClientImpl) GetTimeField() string {
  85. return c.timeField
  86. }
  87. func (c *baseClientImpl) GetMinInterval(queryInterval string) (time.Duration, error) {
  88. return tsdb.GetIntervalFrom(c.ds, simplejson.NewFromAny(map[string]interface{}{
  89. "interval": queryInterval,
  90. }), 5*time.Second)
  91. }
  92. func (c *baseClientImpl) getSettings() *simplejson.Json {
  93. return c.ds.JsonData
  94. }
  95. type multiRequest struct {
  96. header map[string]interface{}
  97. body interface{}
  98. }
  99. func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) {
  100. payload := bytes.Buffer{}
  101. for _, r := range requests {
  102. reqHeader, err := json.Marshal(r.header)
  103. if err != nil {
  104. return nil, err
  105. }
  106. payload.WriteString(string(reqHeader) + "\n")
  107. reqBody, err := json.Marshal(r.body)
  108. if err != nil {
  109. return nil, err
  110. }
  111. payload.WriteString(string(reqBody) + "\n")
  112. }
  113. return c.executeRequest(http.MethodPost, uriPath, payload.Bytes())
  114. }
  115. func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*http.Response, error) {
  116. u, _ := url.Parse(c.ds.Url)
  117. u.Path = path.Join(u.Path, uriPath)
  118. var req *http.Request
  119. var err error
  120. if method == http.MethodPost {
  121. req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(body))
  122. } else {
  123. req, err = http.NewRequest(http.MethodGet, u.String(), nil)
  124. }
  125. if err != nil {
  126. return nil, err
  127. }
  128. req.Header.Set("User-Agent", "Grafana")
  129. req.Header.Set("Content-Type", "application/json")
  130. if c.ds.BasicAuth {
  131. clientLog.Debug("Request configured to use basic authentication")
  132. req.SetBasicAuth(c.ds.BasicAuthUser, c.ds.BasicAuthPassword)
  133. }
  134. if !c.ds.BasicAuth && c.ds.User != "" {
  135. clientLog.Debug("Request configured to use basic authentication")
  136. req.SetBasicAuth(c.ds.User, c.ds.Password)
  137. }
  138. httpClient, err := c.ds.GetHttpClient()
  139. if err != nil {
  140. return nil, err
  141. }
  142. if method == http.MethodPost {
  143. clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
  144. } else {
  145. clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
  146. }
  147. return ctxhttp.Do(c.ctx, httpClient, req)
  148. }
  149. func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
  150. multiRequests := c.createMultiSearchRequests(r.Requests)
  151. res, err := c.executeBatchRequest("_msearch", multiRequests)
  152. if err != nil {
  153. return nil, err
  154. }
  155. var msr MultiSearchResponse
  156. defer res.Body.Close()
  157. dec := json.NewDecoder(res.Body)
  158. err = dec.Decode(&msr)
  159. if err != nil {
  160. return nil, err
  161. }
  162. clientLog.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
  163. msr.status = res.StatusCode
  164. return &msr, nil
  165. }
  166. func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  167. multiRequests := []*multiRequest{}
  168. for _, searchReq := range searchRequests {
  169. multiRequests = append(multiRequests, &multiRequest{
  170. header: map[string]interface{}{
  171. "search_type": "query_then_fetch",
  172. "ignore_unavailable": true,
  173. "index": strings.Join(c.indices, ","),
  174. },
  175. body: searchReq,
  176. })
  177. }
  178. return multiRequests
  179. }
  180. type v2Client struct {
  181. baseClient
  182. }
  183. func newV2Client(bc baseClient) (*v2Client, error) {
  184. c := v2Client{
  185. baseClient: bc,
  186. }
  187. return &c, nil
  188. }
  189. func (c *v2Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  190. multiRequests := c.baseClient.createMultiSearchRequests(searchRequests)
  191. for _, mr := range multiRequests {
  192. mr.header["search_type"] = "count"
  193. }
  194. return multiRequests
  195. }
  196. type v5Client struct {
  197. baseClient
  198. }
  199. func newV5Client(bc baseClient) (*v5Client, error) {
  200. c := v5Client{
  201. baseClient: bc,
  202. }
  203. return &c, nil
  204. }
  205. type v56Client struct {
  206. *v5Client
  207. maxConcurrentShardRequests int
  208. }
  209. func newV56Client(bc baseClient) (*v56Client, error) {
  210. v5Client := v5Client{
  211. baseClient: bc,
  212. }
  213. maxConcurrentShardRequests := bc.getSettings().Get("maxConcurrentShardRequests").MustInt(256)
  214. c := v56Client{
  215. v5Client: &v5Client,
  216. maxConcurrentShardRequests: maxConcurrentShardRequests,
  217. }
  218. return &c, nil
  219. }
  220. func (c *v56Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  221. multiRequests := c.v5Client.createMultiSearchRequests(searchRequests)
  222. for _, mr := range multiRequests {
  223. mr.header["max_concurrent_shard_requests"] = c.maxConcurrentShardRequests
  224. }
  225. return multiRequests
  226. }
  227. func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
  228. return NewMultiSearchRequestBuilder(c.GetVersion())
  229. }