client.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "path"
  10. "strings"
  11. "time"
  12. "github.com/grafana/grafana/pkg/components/simplejson"
  13. "github.com/grafana/grafana/pkg/log"
  14. "github.com/grafana/grafana/pkg/tsdb"
  15. "github.com/grafana/grafana/pkg/models"
  16. "golang.org/x/net/context/ctxhttp"
  17. )
  18. const loggerName = "tsdb.elasticsearch.client"
  19. var (
  20. clientLog = log.New(loggerName)
  21. intervalCalculator = tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: 15 * time.Second})
  22. )
  23. // Client represents a client which can interact with elasticsearch api
  24. type Client interface {
  25. GetVersion() int
  26. GetTimeField() string
  27. GetMinInterval(queryInterval string) (time.Duration, error)
  28. ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
  29. MultiSearch() *MultiSearchRequestBuilder
  30. }
  31. // NewClient creates a new elasticsearch client
  32. var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb.TimeRange) (Client, error) {
  33. version, err := ds.JsonData.Get("esVersion").Int()
  34. if err != nil {
  35. return nil, fmt.Errorf("eleasticsearch version is required, err=%v", err)
  36. }
  37. timeField, err := ds.JsonData.Get("timeField").String()
  38. if err != nil {
  39. return nil, fmt.Errorf("eleasticsearch time field name is required, err=%v", err)
  40. }
  41. indexInterval := ds.JsonData.Get("interval").MustString()
  42. ip, err := newIndexPattern(indexInterval, ds.Database)
  43. if err != nil {
  44. return nil, err
  45. }
  46. indices, err := ip.GetIndices(timeRange)
  47. if err != nil {
  48. return nil, err
  49. }
  50. bc := &baseClientImpl{
  51. ctx: ctx,
  52. ds: ds,
  53. version: version,
  54. timeField: timeField,
  55. indices: indices,
  56. }
  57. clientLog.Debug("Creating new client", "version", version, "timeField", timeField, "indices", strings.Join(indices, ", "))
  58. switch version {
  59. case 2:
  60. return newV2Client(bc)
  61. case 5:
  62. return newV5Client(bc)
  63. case 56:
  64. return newV56Client(bc)
  65. }
  66. return nil, fmt.Errorf("elasticsearch version=%d is not supported", version)
  67. }
  68. type baseClient interface {
  69. Client
  70. getSettings() *simplejson.Json
  71. executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error)
  72. executeRequest(method, uriPath string, body []byte) (*http.Response, error)
  73. createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest
  74. }
  75. type baseClientImpl struct {
  76. ctx context.Context
  77. ds *models.DataSource
  78. version int
  79. timeField string
  80. indices []string
  81. }
  82. func (c *baseClientImpl) GetVersion() int {
  83. return c.version
  84. }
  85. func (c *baseClientImpl) GetTimeField() string {
  86. return c.timeField
  87. }
  88. func (c *baseClientImpl) GetMinInterval(queryInterval string) (time.Duration, error) {
  89. return tsdb.GetIntervalFrom(c.ds, simplejson.NewFromAny(map[string]string{
  90. "interval": queryInterval,
  91. }), 15*time.Second)
  92. }
  93. func (c *baseClientImpl) getSettings() *simplejson.Json {
  94. return c.ds.JsonData
  95. }
  96. type multiRequest struct {
  97. header map[string]interface{}
  98. body interface{}
  99. }
  100. func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) {
  101. payload := bytes.Buffer{}
  102. for _, r := range requests {
  103. reqHeader, err := json.Marshal(r.header)
  104. if err != nil {
  105. return nil, err
  106. }
  107. payload.WriteString(string(reqHeader) + "\n")
  108. reqBody, err := json.Marshal(r.body)
  109. if err != nil {
  110. return nil, err
  111. }
  112. payload.WriteString(string(reqBody) + "\n")
  113. }
  114. return c.executeRequest(http.MethodPost, uriPath, payload.Bytes())
  115. }
  116. func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*http.Response, error) {
  117. u, _ := url.Parse(c.ds.Url)
  118. u.Path = path.Join(u.Path, uriPath)
  119. var req *http.Request
  120. var err error
  121. if method == http.MethodPost {
  122. req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(body))
  123. } else {
  124. req, err = http.NewRequest(http.MethodGet, u.String(), nil)
  125. }
  126. if err != nil {
  127. return nil, err
  128. }
  129. req.Header.Set("User-Agent", "Grafana")
  130. req.Header.Set("Content-Type", "application/json")
  131. if c.ds.BasicAuth {
  132. clientLog.Debug("Request configured to use basic authentication")
  133. req.SetBasicAuth(c.ds.BasicAuthUser, c.ds.BasicAuthPassword)
  134. }
  135. if !c.ds.BasicAuth && c.ds.User != "" {
  136. clientLog.Debug("Request configured to use basic authentication")
  137. req.SetBasicAuth(c.ds.User, c.ds.Password)
  138. }
  139. httpClient, err := c.ds.GetHttpClient()
  140. if err != nil {
  141. return nil, err
  142. }
  143. if method == http.MethodPost {
  144. clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
  145. } else {
  146. clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
  147. }
  148. return ctxhttp.Do(c.ctx, httpClient, req)
  149. }
  150. func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
  151. multiRequests := c.createMultiSearchRequests(r.Requests)
  152. res, err := c.executeBatchRequest("_msearch", multiRequests)
  153. if err != nil {
  154. return nil, err
  155. }
  156. var msr MultiSearchResponse
  157. defer res.Body.Close()
  158. dec := json.NewDecoder(res.Body)
  159. err = dec.Decode(&msr)
  160. if err != nil {
  161. return nil, err
  162. }
  163. clientLog.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
  164. msr.status = res.StatusCode
  165. return &msr, nil
  166. }
  167. func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  168. multiRequests := []*multiRequest{}
  169. for _, searchReq := range searchRequests {
  170. multiRequests = append(multiRequests, &multiRequest{
  171. header: map[string]interface{}{
  172. "search_type": "query_then_fetch",
  173. "ignore_unavailable": true,
  174. "index": strings.Join(c.indices, ","),
  175. },
  176. body: searchReq,
  177. })
  178. }
  179. return multiRequests
  180. }
  181. type v2Client struct {
  182. baseClient
  183. }
  184. func newV2Client(bc baseClient) (*v2Client, error) {
  185. c := v2Client{
  186. baseClient: bc,
  187. }
  188. return &c, nil
  189. }
  190. func (c *v2Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  191. multiRequests := c.baseClient.createMultiSearchRequests(searchRequests)
  192. for _, mr := range multiRequests {
  193. mr.header["search_type"] = "count"
  194. }
  195. return multiRequests
  196. }
  197. type v5Client struct {
  198. baseClient
  199. }
  200. func newV5Client(bc baseClient) (*v5Client, error) {
  201. c := v5Client{
  202. baseClient: bc,
  203. }
  204. return &c, nil
  205. }
  206. type v56Client struct {
  207. *v5Client
  208. maxConcurrentShardRequests int
  209. }
  210. func newV56Client(bc baseClient) (*v56Client, error) {
  211. v5Client := v5Client{
  212. baseClient: bc,
  213. }
  214. maxConcurrentShardRequests := bc.getSettings().Get("maxConcurrentShardRequests").MustInt(256)
  215. c := v56Client{
  216. v5Client: &v5Client,
  217. maxConcurrentShardRequests: maxConcurrentShardRequests,
  218. }
  219. return &c, nil
  220. }
  221. func (c *v56Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  222. multiRequests := c.v5Client.createMultiSearchRequests(searchRequests)
  223. for _, mr := range multiRequests {
  224. mr.header["max_concurrent_shard_requests"] = c.maxConcurrentShardRequests
  225. }
  226. return multiRequests
  227. }
  228. func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
  229. return NewMultiSearchRequestBuilder(c.GetVersion())
  230. }