client.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "path"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/grafana/grafana/pkg/components/simplejson"
  15. "github.com/grafana/grafana/pkg/infra/log"
  16. "github.com/grafana/grafana/pkg/tsdb"
  17. "github.com/grafana/grafana/pkg/models"
  18. "golang.org/x/net/context/ctxhttp"
  19. )
  20. const loggerName = "tsdb.elasticsearch.client"
  21. var (
  22. clientLog = log.New(loggerName)
  23. )
  24. var newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
  25. return ds.GetHttpClient()
  26. }
  27. // Client represents a client which can interact with elasticsearch api
  28. type Client interface {
  29. GetVersion() int
  30. GetTimeField() string
  31. GetMinInterval(queryInterval string) (time.Duration, error)
  32. ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
  33. MultiSearch() *MultiSearchRequestBuilder
  34. EnableDebug()
  35. }
  36. // NewClient creates a new elasticsearch client
  37. var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb.TimeRange) (Client, error) {
  38. version, err := ds.JsonData.Get("esVersion").Int()
  39. if err != nil {
  40. return nil, fmt.Errorf("elasticsearch version is required, err=%v", err)
  41. }
  42. timeField, err := ds.JsonData.Get("timeField").String()
  43. if err != nil {
  44. return nil, fmt.Errorf("elasticsearch time field name is required, err=%v", err)
  45. }
  46. indexInterval := ds.JsonData.Get("interval").MustString()
  47. ip, err := newIndexPattern(indexInterval, ds.Database)
  48. if err != nil {
  49. return nil, err
  50. }
  51. indices, err := ip.GetIndices(timeRange)
  52. if err != nil {
  53. return nil, err
  54. }
  55. clientLog.Debug("Creating new client", "version", version, "timeField", timeField, "indices", strings.Join(indices, ", "))
  56. switch version {
  57. case 2, 5, 56, 60, 70:
  58. return &baseClientImpl{
  59. ctx: ctx,
  60. ds: ds,
  61. version: version,
  62. timeField: timeField,
  63. indices: indices,
  64. timeRange: timeRange,
  65. }, nil
  66. }
  67. return nil, fmt.Errorf("elasticsearch version=%d is not supported", version)
  68. }
  69. type baseClientImpl struct {
  70. ctx context.Context
  71. ds *models.DataSource
  72. version int
  73. timeField string
  74. indices []string
  75. timeRange *tsdb.TimeRange
  76. debugEnabled bool
  77. }
  78. func (c *baseClientImpl) GetVersion() int {
  79. return c.version
  80. }
  81. func (c *baseClientImpl) GetTimeField() string {
  82. return c.timeField
  83. }
  84. func (c *baseClientImpl) GetMinInterval(queryInterval string) (time.Duration, error) {
  85. return tsdb.GetIntervalFrom(c.ds, simplejson.NewFromAny(map[string]interface{}{
  86. "interval": queryInterval,
  87. }), 5*time.Second)
  88. }
  89. func (c *baseClientImpl) getSettings() *simplejson.Json {
  90. return c.ds.JsonData
  91. }
  92. type multiRequest struct {
  93. header map[string]interface{}
  94. body interface{}
  95. interval tsdb.Interval
  96. }
  97. func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*response, error) {
  98. bytes, err := c.encodeBatchRequests(requests)
  99. if err != nil {
  100. return nil, err
  101. }
  102. return c.executeRequest(http.MethodPost, uriPath, uriQuery, bytes)
  103. }
  104. func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) {
  105. clientLog.Debug("Encoding batch requests to json", "batch requests", len(requests))
  106. start := time.Now()
  107. payload := bytes.Buffer{}
  108. for _, r := range requests {
  109. reqHeader, err := json.Marshal(r.header)
  110. if err != nil {
  111. return nil, err
  112. }
  113. payload.WriteString(string(reqHeader) + "\n")
  114. reqBody, err := json.Marshal(r.body)
  115. if err != nil {
  116. return nil, err
  117. }
  118. body := string(reqBody)
  119. body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10), -1)
  120. body = strings.Replace(body, "$__interval", r.interval.Text, -1)
  121. payload.WriteString(body + "\n")
  122. }
  123. elapsed := time.Since(start)
  124. clientLog.Debug("Encoded batch requests to json", "took", elapsed)
  125. return payload.Bytes(), nil
  126. }
  127. func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*response, error) {
  128. u, _ := url.Parse(c.ds.Url)
  129. u.Path = path.Join(u.Path, uriPath)
  130. u.RawQuery = uriQuery
  131. var req *http.Request
  132. var err error
  133. if method == http.MethodPost {
  134. req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(body))
  135. } else {
  136. req, err = http.NewRequest(http.MethodGet, u.String(), nil)
  137. }
  138. if err != nil {
  139. return nil, err
  140. }
  141. clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
  142. var reqInfo *SearchRequestInfo
  143. if c.debugEnabled {
  144. reqInfo = &SearchRequestInfo{
  145. Method: req.Method,
  146. Url: req.URL.String(),
  147. Data: string(body),
  148. }
  149. }
  150. req.Header.Set("User-Agent", "Grafana")
  151. req.Header.Set("Content-Type", "application/json")
  152. if c.ds.BasicAuth {
  153. clientLog.Debug("Request configured to use basic authentication")
  154. req.SetBasicAuth(c.ds.BasicAuthUser, c.ds.DecryptedBasicAuthPassword())
  155. }
  156. if !c.ds.BasicAuth && c.ds.User != "" {
  157. clientLog.Debug("Request configured to use basic authentication")
  158. req.SetBasicAuth(c.ds.User, c.ds.DecryptedPassword())
  159. }
  160. httpClient, err := newDatasourceHttpClient(c.ds)
  161. if err != nil {
  162. return nil, err
  163. }
  164. start := time.Now()
  165. defer func() {
  166. elapsed := time.Since(start)
  167. clientLog.Debug("Executed request", "took", elapsed)
  168. }()
  169. res, err := ctxhttp.Do(c.ctx, httpClient, req)
  170. return &response{
  171. httpResponse: res,
  172. reqInfo: reqInfo,
  173. }, err
  174. }
  175. func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
  176. clientLog.Debug("Executing multisearch", "search requests", len(r.Requests))
  177. multiRequests := c.createMultiSearchRequests(r.Requests)
  178. queryParams := c.getMultiSearchQueryParameters()
  179. clientRes, err := c.executeBatchRequest("_msearch", queryParams, multiRequests)
  180. if err != nil {
  181. return nil, err
  182. }
  183. res := clientRes.httpResponse
  184. defer res.Body.Close()
  185. clientLog.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
  186. start := time.Now()
  187. clientLog.Debug("Decoding multisearch json response")
  188. var bodyBytes []byte
  189. if c.debugEnabled {
  190. tmpBytes, err := ioutil.ReadAll(res.Body)
  191. if err != nil {
  192. clientLog.Error("failed to read http response bytes", "error", err)
  193. } else {
  194. bodyBytes = make([]byte, len(tmpBytes))
  195. copy(bodyBytes, tmpBytes)
  196. res.Body = ioutil.NopCloser(bytes.NewBuffer(tmpBytes))
  197. }
  198. }
  199. var msr MultiSearchResponse
  200. dec := json.NewDecoder(res.Body)
  201. err = dec.Decode(&msr)
  202. if err != nil {
  203. return nil, err
  204. }
  205. elapsed := time.Since(start)
  206. clientLog.Debug("Decoded multisearch json response", "took", elapsed)
  207. msr.Status = res.StatusCode
  208. if c.debugEnabled {
  209. bodyJSON, err := simplejson.NewFromReader(bytes.NewBuffer(bodyBytes))
  210. var data *simplejson.Json
  211. if err != nil {
  212. clientLog.Error("failed to decode http response into json", "error", err)
  213. } else {
  214. data = bodyJSON
  215. }
  216. msr.DebugInfo = &SearchDebugInfo{
  217. Request: clientRes.reqInfo,
  218. Response: &SearchResponseInfo{
  219. Status: res.StatusCode,
  220. Data: data,
  221. },
  222. }
  223. }
  224. return &msr, nil
  225. }
  226. func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
  227. multiRequests := []*multiRequest{}
  228. for _, searchReq := range searchRequests {
  229. mr := multiRequest{
  230. header: map[string]interface{}{
  231. "search_type": "query_then_fetch",
  232. "ignore_unavailable": true,
  233. "index": strings.Join(c.indices, ","),
  234. },
  235. body: searchReq,
  236. interval: searchReq.Interval,
  237. }
  238. if c.version == 2 {
  239. mr.header["search_type"] = "count"
  240. }
  241. if c.version >= 56 && c.version < 70 {
  242. maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(256)
  243. mr.header["max_concurrent_shard_requests"] = maxConcurrentShardRequests
  244. }
  245. multiRequests = append(multiRequests, &mr)
  246. }
  247. return multiRequests
  248. }
  249. func (c *baseClientImpl) getMultiSearchQueryParameters() string {
  250. if c.version >= 70 {
  251. maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(5)
  252. return fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests)
  253. }
  254. return ""
  255. }
  256. func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
  257. return NewMultiSearchRequestBuilder(c.GetVersion())
  258. }
  259. func (c *baseClientImpl) EnableDebug() {
  260. c.debugEnabled = true
  261. }