client_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/http/httptest"
  9. "testing"
  10. "time"
  11. "github.com/grafana/grafana/pkg/components/simplejson"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "github.com/grafana/grafana/pkg/models"
  14. . "github.com/smartystreets/goconvey/convey"
  15. )
  16. func TestClient(t *testing.T) {
  17. Convey("Test elasticsearch client", t, func() {
  18. Convey("NewClient", func() {
  19. Convey("When no version set should return error", func() {
  20. ds := &models.DataSource{
  21. JsonData: simplejson.NewFromAny(make(map[string]interface{})),
  22. }
  23. _, err := NewClient(context.Background(), ds, nil)
  24. So(err, ShouldNotBeNil)
  25. })
  26. Convey("When no time field name set should return error", func() {
  27. ds := &models.DataSource{
  28. JsonData: simplejson.NewFromAny(map[string]interface{}{
  29. "esVersion": 5,
  30. }),
  31. }
  32. _, err := NewClient(context.Background(), ds, nil)
  33. So(err, ShouldNotBeNil)
  34. })
  35. Convey("When unsupported version set should return error", func() {
  36. ds := &models.DataSource{
  37. JsonData: simplejson.NewFromAny(map[string]interface{}{
  38. "esVersion": 6,
  39. "timeField": "@timestamp",
  40. }),
  41. }
  42. _, err := NewClient(context.Background(), ds, nil)
  43. So(err, ShouldNotBeNil)
  44. })
  45. Convey("When version 2 should return v2 client", func() {
  46. ds := &models.DataSource{
  47. JsonData: simplejson.NewFromAny(map[string]interface{}{
  48. "esVersion": 2,
  49. "timeField": "@timestamp",
  50. }),
  51. }
  52. c, err := NewClient(context.Background(), ds, nil)
  53. So(err, ShouldBeNil)
  54. So(c.GetVersion(), ShouldEqual, 2)
  55. })
  56. Convey("When version 5 should return v5 client", func() {
  57. ds := &models.DataSource{
  58. JsonData: simplejson.NewFromAny(map[string]interface{}{
  59. "esVersion": 5,
  60. "timeField": "@timestamp",
  61. }),
  62. }
  63. c, err := NewClient(context.Background(), ds, nil)
  64. So(err, ShouldBeNil)
  65. So(c.GetVersion(), ShouldEqual, 5)
  66. })
  67. Convey("When version 56 should return v5.6 client", func() {
  68. ds := &models.DataSource{
  69. JsonData: simplejson.NewFromAny(map[string]interface{}{
  70. "esVersion": 56,
  71. "timeField": "@timestamp",
  72. }),
  73. }
  74. c, err := NewClient(context.Background(), ds, nil)
  75. So(err, ShouldBeNil)
  76. So(c.GetVersion(), ShouldEqual, 56)
  77. })
  78. Convey("When version 60 should return v6.0 client", func() {
  79. ds := &models.DataSource{
  80. JsonData: simplejson.NewFromAny(map[string]interface{}{
  81. "esVersion": 60,
  82. "timeField": "@timestamp",
  83. }),
  84. }
  85. c, err := NewClient(context.Background(), ds, nil)
  86. So(err, ShouldBeNil)
  87. So(c.GetVersion(), ShouldEqual, 60)
  88. })
  89. })
  90. Convey("Given a fake http client", func() {
  91. var responseBuffer *bytes.Buffer
  92. var req *http.Request
  93. ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  94. req = r
  95. buf, err := ioutil.ReadAll(r.Body)
  96. if err != nil {
  97. t.Fatalf("Failed to read response body, err=%v", err)
  98. }
  99. responseBuffer = bytes.NewBuffer(buf)
  100. }))
  101. currentNewDatasourceHttpClient := newDatasourceHttpClient
  102. newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
  103. return ts.Client(), nil
  104. }
  105. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  106. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  107. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  108. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  109. timeRange := tsdb.NewTimeRange(fromStr, toStr)
  110. Convey("and a v2.x client", func() {
  111. ds := models.DataSource{
  112. Database: "[metrics-]YYYY.MM.DD",
  113. Url: ts.URL,
  114. JsonData: simplejson.NewFromAny(map[string]interface{}{
  115. "esVersion": 2,
  116. "timeField": "@timestamp",
  117. "interval": "Daily",
  118. }),
  119. }
  120. c, err := NewClient(context.Background(), &ds, timeRange)
  121. So(err, ShouldBeNil)
  122. So(c, ShouldNotBeNil)
  123. Convey("When executing multi search", func() {
  124. ms, err := createMultisearchForTest(c)
  125. So(err, ShouldBeNil)
  126. c.ExecuteMultisearch(ms)
  127. Convey("Should send correct request and payload", func() {
  128. So(req, ShouldNotBeNil)
  129. So(req.Method, ShouldEqual, http.MethodPost)
  130. So(req.URL.Path, ShouldEqual, "/_msearch")
  131. So(responseBuffer, ShouldNotBeNil)
  132. headerBytes, err := responseBuffer.ReadBytes('\n')
  133. So(err, ShouldBeNil)
  134. bodyBytes := responseBuffer.Bytes()
  135. jHeader, err := simplejson.NewJson(headerBytes)
  136. So(err, ShouldBeNil)
  137. jBody, err := simplejson.NewJson(bodyBytes)
  138. So(err, ShouldBeNil)
  139. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  140. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  141. So(jHeader.Get("search_type").MustString(), ShouldEqual, "count")
  142. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  143. Convey("and replace $__interval variable", func() {
  144. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  145. })
  146. Convey("and replace $__interval_ms variable", func() {
  147. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  148. })
  149. })
  150. })
  151. })
  152. Convey("and a v5.x client", func() {
  153. ds := models.DataSource{
  154. Database: "[metrics-]YYYY.MM.DD",
  155. Url: ts.URL,
  156. JsonData: simplejson.NewFromAny(map[string]interface{}{
  157. "esVersion": 5,
  158. "maxConcurrentShardRequests": 100,
  159. "timeField": "@timestamp",
  160. "interval": "Daily",
  161. }),
  162. }
  163. c, err := NewClient(context.Background(), &ds, timeRange)
  164. So(err, ShouldBeNil)
  165. So(c, ShouldNotBeNil)
  166. Convey("When executing multi search", func() {
  167. ms, err := createMultisearchForTest(c)
  168. So(err, ShouldBeNil)
  169. c.ExecuteMultisearch(ms)
  170. Convey("Should send correct request and payload", func() {
  171. So(req, ShouldNotBeNil)
  172. So(req.Method, ShouldEqual, http.MethodPost)
  173. So(req.URL.Path, ShouldEqual, "/_msearch")
  174. So(responseBuffer, ShouldNotBeNil)
  175. headerBytes, err := responseBuffer.ReadBytes('\n')
  176. So(err, ShouldBeNil)
  177. bodyBytes := responseBuffer.Bytes()
  178. jHeader, err := simplejson.NewJson(headerBytes)
  179. So(err, ShouldBeNil)
  180. jBody, err := simplejson.NewJson(bodyBytes)
  181. So(err, ShouldBeNil)
  182. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  183. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  184. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  185. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  186. Convey("and replace $__interval variable", func() {
  187. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  188. })
  189. Convey("and replace $__interval_ms variable", func() {
  190. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  191. })
  192. })
  193. })
  194. })
  195. Convey("and a v5.6 client", func() {
  196. ds := models.DataSource{
  197. Database: "[metrics-]YYYY.MM.DD",
  198. Url: ts.URL,
  199. JsonData: simplejson.NewFromAny(map[string]interface{}{
  200. "esVersion": 56,
  201. "maxConcurrentShardRequests": 100,
  202. "timeField": "@timestamp",
  203. "interval": "Daily",
  204. }),
  205. }
  206. c, err := NewClient(context.Background(), &ds, timeRange)
  207. So(err, ShouldBeNil)
  208. So(c, ShouldNotBeNil)
  209. Convey("When executing multi search", func() {
  210. ms, err := createMultisearchForTest(c)
  211. So(err, ShouldBeNil)
  212. c.ExecuteMultisearch(ms)
  213. Convey("Should send correct request and payload", func() {
  214. So(req, ShouldNotBeNil)
  215. So(req.Method, ShouldEqual, http.MethodPost)
  216. So(req.URL.Path, ShouldEqual, "/_msearch")
  217. So(responseBuffer, ShouldNotBeNil)
  218. headerBytes, err := responseBuffer.ReadBytes('\n')
  219. So(err, ShouldBeNil)
  220. bodyBytes := responseBuffer.Bytes()
  221. jHeader, err := simplejson.NewJson(headerBytes)
  222. So(err, ShouldBeNil)
  223. jBody, err := simplejson.NewJson(bodyBytes)
  224. So(err, ShouldBeNil)
  225. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  226. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  227. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  228. So(jHeader.Get("max_concurrent_shard_requests").MustInt(), ShouldEqual, 100)
  229. Convey("and replace $__interval variable", func() {
  230. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  231. })
  232. Convey("and replace $__interval_ms variable", func() {
  233. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  234. })
  235. })
  236. })
  237. })
  238. Reset(func() {
  239. newDatasourceHttpClient = currentNewDatasourceHttpClient
  240. })
  241. })
  242. })
  243. }
  244. func createMultisearchForTest(c Client) (*MultiSearchRequest, error) {
  245. msb := c.MultiSearch()
  246. s := msb.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
  247. s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
  248. a.Interval = "$__interval"
  249. ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) {
  250. a.Settings["script"] = "$__interval_ms*@hostname"
  251. })
  252. })
  253. return msb.Build()
  254. }