client_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/http/httptest"
  9. "testing"
  10. "time"
  11. "github.com/grafana/grafana/pkg/components/simplejson"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "github.com/grafana/grafana/pkg/models"
  14. . "github.com/smartystreets/goconvey/convey"
  15. )
  16. func TestClient(t *testing.T) {
  17. Convey("Test elasticsearch client", t, func() {
  18. Convey("NewClient", func() {
  19. Convey("When no version set should return error", func() {
  20. ds := &models.DataSource{
  21. JsonData: simplejson.NewFromAny(make(map[string]interface{})),
  22. }
  23. _, err := NewClient(nil, ds, nil)
  24. So(err, ShouldNotBeNil)
  25. })
  26. Convey("When no time field name set should return error", func() {
  27. ds := &models.DataSource{
  28. JsonData: simplejson.NewFromAny(map[string]interface{}{
  29. "esVersion": 5,
  30. }),
  31. }
  32. _, err := NewClient(nil, ds, nil)
  33. So(err, ShouldNotBeNil)
  34. })
  35. Convey("When unsupported version set should return error", func() {
  36. ds := &models.DataSource{
  37. JsonData: simplejson.NewFromAny(map[string]interface{}{
  38. "esVersion": 6,
  39. "timeField": "@timestamp",
  40. }),
  41. }
  42. _, err := NewClient(nil, ds, nil)
  43. So(err, ShouldNotBeNil)
  44. })
  45. Convey("When version 2 should return v2 client", func() {
  46. ds := &models.DataSource{
  47. JsonData: simplejson.NewFromAny(map[string]interface{}{
  48. "esVersion": 2,
  49. "timeField": "@timestamp",
  50. }),
  51. }
  52. c, err := NewClient(nil, ds, nil)
  53. So(err, ShouldBeNil)
  54. So(c.GetVersion(), ShouldEqual, 2)
  55. })
  56. Convey("When version 5 should return v5 client", func() {
  57. ds := &models.DataSource{
  58. JsonData: simplejson.NewFromAny(map[string]interface{}{
  59. "esVersion": 5,
  60. "timeField": "@timestamp",
  61. }),
  62. }
  63. c, err := NewClient(nil, ds, nil)
  64. So(err, ShouldBeNil)
  65. So(c.GetVersion(), ShouldEqual, 5)
  66. })
  67. Convey("When version 56 should return v5.6 client", func() {
  68. ds := &models.DataSource{
  69. JsonData: simplejson.NewFromAny(map[string]interface{}{
  70. "esVersion": 56,
  71. "timeField": "@timestamp",
  72. }),
  73. }
  74. c, err := NewClient(nil, ds, nil)
  75. So(err, ShouldBeNil)
  76. So(c.GetVersion(), ShouldEqual, 56)
  77. })
  78. })
  79. Convey("Given a fake http client", func() {
  80. var responseBuffer *bytes.Buffer
  81. var req *http.Request
  82. ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  83. req = r
  84. buf, err := ioutil.ReadAll(r.Body)
  85. if err != nil {
  86. t.Fatalf("Failed to read response body, err=%v", err)
  87. }
  88. responseBuffer = bytes.NewBuffer(buf)
  89. }))
  90. currentNewDatasourceHttpClient := newDatasourceHttpClient
  91. newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
  92. return ts.Client(), nil
  93. }
  94. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  95. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  96. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  97. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  98. timeRange := tsdb.NewTimeRange(fromStr, toStr)
  99. Convey("and a v2.x client", func() {
  100. ds := models.DataSource{
  101. Database: "[metrics-]YYYY.MM.DD",
  102. Url: ts.URL,
  103. JsonData: simplejson.NewFromAny(map[string]interface{}{
  104. "esVersion": 2,
  105. "timeField": "@timestamp",
  106. "interval": "Daily",
  107. }),
  108. }
  109. c, err := NewClient(context.Background(), &ds, timeRange)
  110. So(err, ShouldBeNil)
  111. So(c, ShouldNotBeNil)
  112. Convey("When executing multi search", func() {
  113. ms, err := createMultisearchForTest(c)
  114. So(err, ShouldBeNil)
  115. c.ExecuteMultisearch(ms)
  116. Convey("Should send correct request and payload", func() {
  117. So(req, ShouldNotBeNil)
  118. So(req.Method, ShouldEqual, http.MethodPost)
  119. So(req.URL.Path, ShouldEqual, "/_msearch")
  120. So(responseBuffer, ShouldNotBeNil)
  121. headerBytes, err := responseBuffer.ReadBytes('\n')
  122. So(err, ShouldBeNil)
  123. bodyBytes := responseBuffer.Bytes()
  124. jHeader, err := simplejson.NewJson(headerBytes)
  125. So(err, ShouldBeNil)
  126. jBody, err := simplejson.NewJson(bodyBytes)
  127. So(err, ShouldBeNil)
  128. fmt.Println("body", string(headerBytes))
  129. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  130. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  131. So(jHeader.Get("search_type").MustString(), ShouldEqual, "count")
  132. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  133. Convey("and replace $__interval variable", func() {
  134. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  135. })
  136. Convey("and replace $__interval_ms variable", func() {
  137. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  138. })
  139. })
  140. })
  141. })
  142. Convey("and a v5.x client", func() {
  143. ds := models.DataSource{
  144. Database: "[metrics-]YYYY.MM.DD",
  145. Url: ts.URL,
  146. JsonData: simplejson.NewFromAny(map[string]interface{}{
  147. "esVersion": 5,
  148. "maxConcurrentShardRequests": 100,
  149. "timeField": "@timestamp",
  150. "interval": "Daily",
  151. }),
  152. }
  153. c, err := NewClient(context.Background(), &ds, timeRange)
  154. So(err, ShouldBeNil)
  155. So(c, ShouldNotBeNil)
  156. Convey("When executing multi search", func() {
  157. ms, err := createMultisearchForTest(c)
  158. So(err, ShouldBeNil)
  159. c.ExecuteMultisearch(ms)
  160. Convey("Should send correct request and payload", func() {
  161. So(req, ShouldNotBeNil)
  162. So(req.Method, ShouldEqual, http.MethodPost)
  163. So(req.URL.Path, ShouldEqual, "/_msearch")
  164. So(responseBuffer, ShouldNotBeNil)
  165. headerBytes, err := responseBuffer.ReadBytes('\n')
  166. So(err, ShouldBeNil)
  167. bodyBytes := responseBuffer.Bytes()
  168. jHeader, err := simplejson.NewJson(headerBytes)
  169. So(err, ShouldBeNil)
  170. jBody, err := simplejson.NewJson(bodyBytes)
  171. So(err, ShouldBeNil)
  172. fmt.Println("body", string(headerBytes))
  173. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  174. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  175. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  176. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  177. Convey("and replace $__interval variable", func() {
  178. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  179. })
  180. Convey("and replace $__interval_ms variable", func() {
  181. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  182. })
  183. })
  184. })
  185. })
  186. Convey("and a v5.6 client", func() {
  187. ds := models.DataSource{
  188. Database: "[metrics-]YYYY.MM.DD",
  189. Url: ts.URL,
  190. JsonData: simplejson.NewFromAny(map[string]interface{}{
  191. "esVersion": 56,
  192. "maxConcurrentShardRequests": 100,
  193. "timeField": "@timestamp",
  194. "interval": "Daily",
  195. }),
  196. }
  197. c, err := NewClient(context.Background(), &ds, timeRange)
  198. So(err, ShouldBeNil)
  199. So(c, ShouldNotBeNil)
  200. Convey("When executing multi search", func() {
  201. ms, err := createMultisearchForTest(c)
  202. So(err, ShouldBeNil)
  203. c.ExecuteMultisearch(ms)
  204. Convey("Should send correct request and payload", func() {
  205. So(req, ShouldNotBeNil)
  206. So(req.Method, ShouldEqual, http.MethodPost)
  207. So(req.URL.Path, ShouldEqual, "/_msearch")
  208. So(responseBuffer, ShouldNotBeNil)
  209. headerBytes, err := responseBuffer.ReadBytes('\n')
  210. So(err, ShouldBeNil)
  211. bodyBytes := responseBuffer.Bytes()
  212. jHeader, err := simplejson.NewJson(headerBytes)
  213. So(err, ShouldBeNil)
  214. jBody, err := simplejson.NewJson(bodyBytes)
  215. So(err, ShouldBeNil)
  216. fmt.Println("body", string(headerBytes))
  217. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  218. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  219. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  220. So(jHeader.Get("max_concurrent_shard_requests").MustInt(), ShouldEqual, 100)
  221. Convey("and replace $__interval variable", func() {
  222. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  223. })
  224. Convey("and replace $__interval_ms variable", func() {
  225. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  226. })
  227. })
  228. })
  229. })
  230. Reset(func() {
  231. newDatasourceHttpClient = currentNewDatasourceHttpClient
  232. })
  233. })
  234. })
  235. }
  236. func createMultisearchForTest(c Client) (*MultiSearchRequest, error) {
  237. msb := c.MultiSearch()
  238. s := msb.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
  239. s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
  240. a.Interval = "$__interval"
  241. ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) {
  242. a.Settings["script"] = "$__interval_ms*@hostname"
  243. })
  244. })
  245. return msb.Build()
  246. }