client_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/http/httptest"
  9. "testing"
  10. "time"
  11. "github.com/grafana/grafana/pkg/components/simplejson"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "github.com/grafana/grafana/pkg/models"
  14. . "github.com/smartystreets/goconvey/convey"
  15. )
  16. func TestClient(t *testing.T) {
  17. Convey("Test elasticsearch client", t, func() {
  18. Convey("NewClient", func() {
  19. Convey("When no version set should return error", func() {
  20. ds := &models.DataSource{
  21. JsonData: simplejson.NewFromAny(make(map[string]interface{})),
  22. }
  23. _, err := NewClient(context.Background(), ds, nil)
  24. So(err, ShouldNotBeNil)
  25. })
  26. Convey("When no time field name set should return error", func() {
  27. ds := &models.DataSource{
  28. JsonData: simplejson.NewFromAny(map[string]interface{}{
  29. "esVersion": 5,
  30. }),
  31. }
  32. _, err := NewClient(context.Background(), ds, nil)
  33. So(err, ShouldNotBeNil)
  34. })
  35. Convey("When unsupported version set should return error", func() {
  36. ds := &models.DataSource{
  37. JsonData: simplejson.NewFromAny(map[string]interface{}{
  38. "esVersion": 6,
  39. "timeField": "@timestamp",
  40. }),
  41. }
  42. _, err := NewClient(context.Background(), ds, nil)
  43. So(err, ShouldNotBeNil)
  44. })
  45. Convey("When version 2 should return v2 client", func() {
  46. ds := &models.DataSource{
  47. JsonData: simplejson.NewFromAny(map[string]interface{}{
  48. "esVersion": 2,
  49. "timeField": "@timestamp",
  50. }),
  51. }
  52. c, err := NewClient(context.Background(), ds, nil)
  53. So(err, ShouldBeNil)
  54. So(c.GetVersion(), ShouldEqual, 2)
  55. })
  56. Convey("When version 5 should return v5 client", func() {
  57. ds := &models.DataSource{
  58. JsonData: simplejson.NewFromAny(map[string]interface{}{
  59. "esVersion": 5,
  60. "timeField": "@timestamp",
  61. }),
  62. }
  63. c, err := NewClient(context.Background(), ds, nil)
  64. So(err, ShouldBeNil)
  65. So(c.GetVersion(), ShouldEqual, 5)
  66. })
  67. Convey("When version 56 should return v5.6 client", func() {
  68. ds := &models.DataSource{
  69. JsonData: simplejson.NewFromAny(map[string]interface{}{
  70. "esVersion": 56,
  71. "timeField": "@timestamp",
  72. }),
  73. }
  74. c, err := NewClient(context.Background(), ds, nil)
  75. So(err, ShouldBeNil)
  76. So(c.GetVersion(), ShouldEqual, 56)
  77. })
  78. Convey("When version 60 should return v6.0 client", func() {
  79. ds := &models.DataSource{
  80. JsonData: simplejson.NewFromAny(map[string]interface{}{
  81. "esVersion": 60,
  82. "timeField": "@timestamp",
  83. }),
  84. }
  85. c, err := NewClient(context.Background(), ds, nil)
  86. So(err, ShouldBeNil)
  87. So(c.GetVersion(), ShouldEqual, 60)
  88. })
  89. Convey("When version 70 should return v7.0 client", func() {
  90. ds := &models.DataSource{
  91. JsonData: simplejson.NewFromAny(map[string]interface{}{
  92. "esVersion": 70,
  93. "timeField": "@timestamp",
  94. }),
  95. }
  96. c, err := NewClient(context.Background(), ds, nil)
  97. So(err, ShouldBeNil)
  98. So(c.GetVersion(), ShouldEqual, 70)
  99. })
  100. })
  101. Convey("Given a fake http client", func() {
  102. var responseBuffer *bytes.Buffer
  103. var req *http.Request
  104. ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  105. req = r
  106. buf, err := ioutil.ReadAll(r.Body)
  107. if err != nil {
  108. t.Fatalf("Failed to read response body, err=%v", err)
  109. }
  110. responseBuffer = bytes.NewBuffer(buf)
  111. }))
  112. currentNewDatasourceHttpClient := newDatasourceHttpClient
  113. newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
  114. return ts.Client(), nil
  115. }
  116. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  117. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  118. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  119. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  120. timeRange := tsdb.NewTimeRange(fromStr, toStr)
  121. Convey("and a v2.x client", func() {
  122. ds := models.DataSource{
  123. Database: "[metrics-]YYYY.MM.DD",
  124. Url: ts.URL,
  125. JsonData: simplejson.NewFromAny(map[string]interface{}{
  126. "esVersion": 2,
  127. "timeField": "@timestamp",
  128. "interval": "Daily",
  129. }),
  130. }
  131. c, err := NewClient(context.Background(), &ds, timeRange)
  132. So(err, ShouldBeNil)
  133. So(c, ShouldNotBeNil)
  134. Convey("When executing multi search", func() {
  135. ms, err := createMultisearchForTest(c)
  136. So(err, ShouldBeNil)
  137. c.ExecuteMultisearch(ms)
  138. Convey("Should send correct request and payload", func() {
  139. So(req, ShouldNotBeNil)
  140. So(req.Method, ShouldEqual, http.MethodPost)
  141. So(req.URL.Path, ShouldEqual, "/_msearch")
  142. So(responseBuffer, ShouldNotBeNil)
  143. headerBytes, err := responseBuffer.ReadBytes('\n')
  144. So(err, ShouldBeNil)
  145. bodyBytes := responseBuffer.Bytes()
  146. jHeader, err := simplejson.NewJson(headerBytes)
  147. So(err, ShouldBeNil)
  148. jBody, err := simplejson.NewJson(bodyBytes)
  149. So(err, ShouldBeNil)
  150. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  151. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  152. So(jHeader.Get("search_type").MustString(), ShouldEqual, "count")
  153. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  154. Convey("and replace $__interval variable", func() {
  155. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  156. })
  157. Convey("and replace $__interval_ms variable", func() {
  158. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  159. })
  160. })
  161. })
  162. })
  163. Convey("and a v5.x client", func() {
  164. ds := models.DataSource{
  165. Database: "[metrics-]YYYY.MM.DD",
  166. Url: ts.URL,
  167. JsonData: simplejson.NewFromAny(map[string]interface{}{
  168. "esVersion": 5,
  169. "maxConcurrentShardRequests": 100,
  170. "timeField": "@timestamp",
  171. "interval": "Daily",
  172. }),
  173. }
  174. c, err := NewClient(context.Background(), &ds, timeRange)
  175. So(err, ShouldBeNil)
  176. So(c, ShouldNotBeNil)
  177. Convey("When executing multi search", func() {
  178. ms, err := createMultisearchForTest(c)
  179. So(err, ShouldBeNil)
  180. c.ExecuteMultisearch(ms)
  181. Convey("Should send correct request and payload", func() {
  182. So(req, ShouldNotBeNil)
  183. So(req.Method, ShouldEqual, http.MethodPost)
  184. So(req.URL.Path, ShouldEqual, "/_msearch")
  185. So(responseBuffer, ShouldNotBeNil)
  186. headerBytes, err := responseBuffer.ReadBytes('\n')
  187. So(err, ShouldBeNil)
  188. bodyBytes := responseBuffer.Bytes()
  189. jHeader, err := simplejson.NewJson(headerBytes)
  190. So(err, ShouldBeNil)
  191. jBody, err := simplejson.NewJson(bodyBytes)
  192. So(err, ShouldBeNil)
  193. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  194. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  195. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  196. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  197. Convey("and replace $__interval variable", func() {
  198. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  199. })
  200. Convey("and replace $__interval_ms variable", func() {
  201. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  202. })
  203. })
  204. })
  205. })
  206. Convey("and a v5.6 client", func() {
  207. ds := models.DataSource{
  208. Database: "[metrics-]YYYY.MM.DD",
  209. Url: ts.URL,
  210. JsonData: simplejson.NewFromAny(map[string]interface{}{
  211. "esVersion": 56,
  212. "maxConcurrentShardRequests": 100,
  213. "timeField": "@timestamp",
  214. "interval": "Daily",
  215. }),
  216. }
  217. c, err := NewClient(context.Background(), &ds, timeRange)
  218. So(err, ShouldBeNil)
  219. So(c, ShouldNotBeNil)
  220. Convey("When executing multi search", func() {
  221. ms, err := createMultisearchForTest(c)
  222. So(err, ShouldBeNil)
  223. c.ExecuteMultisearch(ms)
  224. Convey("Should send correct request and payload", func() {
  225. So(req, ShouldNotBeNil)
  226. So(req.Method, ShouldEqual, http.MethodPost)
  227. So(req.URL.Path, ShouldEqual, "/_msearch")
  228. So(responseBuffer, ShouldNotBeNil)
  229. headerBytes, err := responseBuffer.ReadBytes('\n')
  230. So(err, ShouldBeNil)
  231. bodyBytes := responseBuffer.Bytes()
  232. jHeader, err := simplejson.NewJson(headerBytes)
  233. So(err, ShouldBeNil)
  234. jBody, err := simplejson.NewJson(bodyBytes)
  235. So(err, ShouldBeNil)
  236. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  237. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  238. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  239. So(jHeader.Get("max_concurrent_shard_requests").MustInt(), ShouldEqual, 100)
  240. Convey("and replace $__interval variable", func() {
  241. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  242. })
  243. Convey("and replace $__interval_ms variable", func() {
  244. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  245. })
  246. })
  247. })
  248. })
  249. Convey("and a v7.0 client", func() {
  250. ds := models.DataSource{
  251. Database: "[metrics-]YYYY.MM.DD",
  252. Url: ts.URL,
  253. JsonData: simplejson.NewFromAny(map[string]interface{}{
  254. "esVersion": 70,
  255. "maxConcurrentShardRequests": 6,
  256. "timeField": "@timestamp",
  257. "interval": "Daily",
  258. }),
  259. }
  260. c, err := NewClient(context.Background(), &ds, timeRange)
  261. So(err, ShouldBeNil)
  262. So(c, ShouldNotBeNil)
  263. Convey("When executing multi search", func() {
  264. ms, err := createMultisearchForTest(c)
  265. So(err, ShouldBeNil)
  266. c.ExecuteMultisearch(ms)
  267. Convey("Should send correct request and payload", func() {
  268. So(req, ShouldNotBeNil)
  269. So(req.Method, ShouldEqual, http.MethodPost)
  270. So(req.URL.Path, ShouldEqual, "/_msearch")
  271. So(req.URL.RawQuery, ShouldEqual, "max_concurrent_shard_requests=6")
  272. So(responseBuffer, ShouldNotBeNil)
  273. headerBytes, err := responseBuffer.ReadBytes('\n')
  274. So(err, ShouldBeNil)
  275. bodyBytes := responseBuffer.Bytes()
  276. jHeader, err := simplejson.NewJson(headerBytes)
  277. So(err, ShouldBeNil)
  278. jBody, err := simplejson.NewJson(bodyBytes)
  279. So(err, ShouldBeNil)
  280. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  281. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  282. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  283. Convey("and replace $__interval variable", func() {
  284. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  285. })
  286. Convey("and replace $__interval_ms variable", func() {
  287. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  288. })
  289. })
  290. })
  291. })
  292. Reset(func() {
  293. newDatasourceHttpClient = currentNewDatasourceHttpClient
  294. })
  295. })
  296. })
  297. }
  298. func createMultisearchForTest(c Client) (*MultiSearchRequest, error) {
  299. msb := c.MultiSearch()
  300. s := msb.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
  301. s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
  302. a.Interval = "$__interval"
  303. ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) {
  304. a.Settings["script"] = "$__interval_ms*@hostname"
  305. })
  306. })
  307. return msb.Build()
  308. }