client_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/http/httptest"
  9. "testing"
  10. "time"
  11. "github.com/grafana/grafana/pkg/components/simplejson"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "github.com/grafana/grafana/pkg/models"
  14. . "github.com/smartystreets/goconvey/convey"
  15. )
  16. //nolint:goconst
  17. func TestClient(t *testing.T) {
  18. Convey("Test elasticsearch client", t, func() {
  19. Convey("NewClient", func() {
  20. Convey("When no version set should return error", func() {
  21. ds := &models.DataSource{
  22. JsonData: simplejson.NewFromAny(make(map[string]interface{})),
  23. }
  24. _, err := NewClient(context.Background(), ds, nil)
  25. So(err, ShouldNotBeNil)
  26. })
  27. Convey("When no time field name set should return error", func() {
  28. ds := &models.DataSource{
  29. JsonData: simplejson.NewFromAny(map[string]interface{}{
  30. "esVersion": 5,
  31. }),
  32. }
  33. _, err := NewClient(context.Background(), ds, nil)
  34. So(err, ShouldNotBeNil)
  35. })
  36. Convey("When unsupported version set should return error", func() {
  37. ds := &models.DataSource{
  38. JsonData: simplejson.NewFromAny(map[string]interface{}{
  39. "esVersion": 6,
  40. "timeField": "@timestamp",
  41. }),
  42. }
  43. _, err := NewClient(context.Background(), ds, nil)
  44. So(err, ShouldNotBeNil)
  45. })
  46. Convey("When version 2 should return v2 client", func() {
  47. ds := &models.DataSource{
  48. JsonData: simplejson.NewFromAny(map[string]interface{}{
  49. "esVersion": 2,
  50. "timeField": "@timestamp",
  51. }),
  52. }
  53. c, err := NewClient(context.Background(), ds, nil)
  54. So(err, ShouldBeNil)
  55. So(c.GetVersion(), ShouldEqual, 2)
  56. })
  57. Convey("When version 5 should return v5 client", func() {
  58. ds := &models.DataSource{
  59. JsonData: simplejson.NewFromAny(map[string]interface{}{
  60. "esVersion": 5,
  61. "timeField": "@timestamp",
  62. }),
  63. }
  64. c, err := NewClient(context.Background(), ds, nil)
  65. So(err, ShouldBeNil)
  66. So(c.GetVersion(), ShouldEqual, 5)
  67. })
  68. Convey("When version 56 should return v5.6 client", func() {
  69. ds := &models.DataSource{
  70. JsonData: simplejson.NewFromAny(map[string]interface{}{
  71. "esVersion": 56,
  72. "timeField": "@timestamp",
  73. }),
  74. }
  75. c, err := NewClient(context.Background(), ds, nil)
  76. So(err, ShouldBeNil)
  77. So(c.GetVersion(), ShouldEqual, 56)
  78. })
  79. Convey("When version 60 should return v6.0 client", func() {
  80. ds := &models.DataSource{
  81. JsonData: simplejson.NewFromAny(map[string]interface{}{
  82. "esVersion": 60,
  83. "timeField": "@timestamp",
  84. }),
  85. }
  86. c, err := NewClient(context.Background(), ds, nil)
  87. So(err, ShouldBeNil)
  88. So(c.GetVersion(), ShouldEqual, 60)
  89. })
  90. Convey("When version 70 should return v7.0 client", func() {
  91. ds := &models.DataSource{
  92. JsonData: simplejson.NewFromAny(map[string]interface{}{
  93. "esVersion": 70,
  94. "timeField": "@timestamp",
  95. }),
  96. }
  97. c, err := NewClient(context.Background(), ds, nil)
  98. So(err, ShouldBeNil)
  99. So(c.GetVersion(), ShouldEqual, 70)
  100. })
  101. })
  102. httpClientScenario(t, "Given a fake http client and a v2.x client with response", &models.DataSource{
  103. Database: "[metrics-]YYYY.MM.DD",
  104. JsonData: simplejson.NewFromAny(map[string]interface{}{
  105. "esVersion": 2,
  106. "timeField": "@timestamp",
  107. "interval": "Daily",
  108. }),
  109. }, func(sc *scenarioContext) {
  110. sc.responseBody = `{
  111. "responses": [
  112. {
  113. "hits": { "hits": [], "max_score": 0, "total": 4656 },
  114. "status": 200
  115. }
  116. ]
  117. }`
  118. Convey("When executing multi search", func() {
  119. ms, err := createMultisearchForTest(sc.client)
  120. So(err, ShouldBeNil)
  121. res, err := sc.client.ExecuteMultisearch(ms)
  122. So(err, ShouldBeNil)
  123. Convey("Should send correct request and payload", func() {
  124. So(sc.request, ShouldNotBeNil)
  125. So(sc.request.Method, ShouldEqual, http.MethodPost)
  126. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  127. So(sc.requestBody, ShouldNotBeNil)
  128. headerBytes, err := sc.requestBody.ReadBytes('\n')
  129. So(err, ShouldBeNil)
  130. bodyBytes := sc.requestBody.Bytes()
  131. jHeader, err := simplejson.NewJson(headerBytes)
  132. So(err, ShouldBeNil)
  133. jBody, err := simplejson.NewJson(bodyBytes)
  134. So(err, ShouldBeNil)
  135. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  136. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  137. So(jHeader.Get("search_type").MustString(), ShouldEqual, "count")
  138. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  139. Convey("and replace $__interval variable", func() {
  140. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  141. })
  142. Convey("and replace $__interval_ms variable", func() {
  143. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  144. })
  145. })
  146. Convey("Should parse response", func() {
  147. So(res.Status, ShouldEqual, 200)
  148. So(res.Responses, ShouldHaveLength, 1)
  149. })
  150. })
  151. })
  152. httpClientScenario(t, "Given a fake http client and a v5.x client with response", &models.DataSource{
  153. Database: "[metrics-]YYYY.MM.DD",
  154. JsonData: simplejson.NewFromAny(map[string]interface{}{
  155. "esVersion": 5,
  156. "maxConcurrentShardRequests": 100,
  157. "timeField": "@timestamp",
  158. "interval": "Daily",
  159. }),
  160. }, func(sc *scenarioContext) {
  161. sc.responseBody = `{
  162. "responses": [
  163. {
  164. "hits": { "hits": [], "max_score": 0, "total": 4656 },
  165. "status": 200
  166. }
  167. ]
  168. }`
  169. Convey("When executing multi search", func() {
  170. ms, err := createMultisearchForTest(sc.client)
  171. So(err, ShouldBeNil)
  172. res, err := sc.client.ExecuteMultisearch(ms)
  173. So(err, ShouldBeNil)
  174. Convey("Should send correct request and payload", func() {
  175. So(sc.request, ShouldNotBeNil)
  176. So(sc.request.Method, ShouldEqual, http.MethodPost)
  177. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  178. So(sc.requestBody, ShouldNotBeNil)
  179. headerBytes, err := sc.requestBody.ReadBytes('\n')
  180. So(err, ShouldBeNil)
  181. bodyBytes := sc.requestBody.Bytes()
  182. jHeader, err := simplejson.NewJson(headerBytes)
  183. So(err, ShouldBeNil)
  184. jBody, err := simplejson.NewJson(bodyBytes)
  185. So(err, ShouldBeNil)
  186. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  187. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  188. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  189. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  190. Convey("and replace $__interval variable", func() {
  191. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  192. })
  193. Convey("and replace $__interval_ms variable", func() {
  194. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  195. })
  196. })
  197. Convey("Should parse response", func() {
  198. So(res.Status, ShouldEqual, 200)
  199. So(res.Responses, ShouldHaveLength, 1)
  200. })
  201. })
  202. })
  203. httpClientScenario(t, "Given a fake http client and a v5.6 client with response", &models.DataSource{
  204. Database: "[metrics-]YYYY.MM.DD",
  205. JsonData: simplejson.NewFromAny(map[string]interface{}{
  206. "esVersion": 56,
  207. "maxConcurrentShardRequests": 100,
  208. "timeField": "@timestamp",
  209. "interval": "Daily",
  210. }),
  211. }, func(sc *scenarioContext) {
  212. sc.responseBody = `{
  213. "responses": [
  214. {
  215. "hits": { "hits": [], "max_score": 0, "total": 4656 },
  216. "status": 200
  217. }
  218. ]
  219. }`
  220. Convey("When executing multi search", func() {
  221. ms, err := createMultisearchForTest(sc.client)
  222. So(err, ShouldBeNil)
  223. res, err := sc.client.ExecuteMultisearch(ms)
  224. So(err, ShouldBeNil)
  225. Convey("Should send correct request and payload", func() {
  226. So(sc.request, ShouldNotBeNil)
  227. So(sc.request.Method, ShouldEqual, http.MethodPost)
  228. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  229. So(sc.requestBody, ShouldNotBeNil)
  230. headerBytes, err := sc.requestBody.ReadBytes('\n')
  231. So(err, ShouldBeNil)
  232. bodyBytes := sc.requestBody.Bytes()
  233. jHeader, err := simplejson.NewJson(headerBytes)
  234. So(err, ShouldBeNil)
  235. jBody, err := simplejson.NewJson(bodyBytes)
  236. So(err, ShouldBeNil)
  237. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  238. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  239. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  240. So(jHeader.Get("max_concurrent_shard_requests").MustInt(), ShouldEqual, 100)
  241. Convey("and replace $__interval variable", func() {
  242. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  243. })
  244. Convey("and replace $__interval_ms variable", func() {
  245. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  246. })
  247. })
  248. Convey("Should parse response", func() {
  249. So(res.Status, ShouldEqual, 200)
  250. So(res.Responses, ShouldHaveLength, 1)
  251. })
  252. })
  253. })
  254. httpClientScenario(t, "Given a fake http client and a v7.0 client with response", &models.DataSource{
  255. Database: "[metrics-]YYYY.MM.DD",
  256. JsonData: simplejson.NewFromAny(map[string]interface{}{
  257. "esVersion": 70,
  258. "maxConcurrentShardRequests": 6,
  259. "timeField": "@timestamp",
  260. "interval": "Daily",
  261. }),
  262. }, func(sc *scenarioContext) {
  263. sc.responseBody = `{
  264. "responses": [
  265. {
  266. "hits": { "hits": [], "max_score": 0, "total": { "value": 4656, "relation": "eq"} },
  267. "status": 200
  268. }
  269. ]
  270. }`
  271. Convey("When executing multi search", func() {
  272. ms, err := createMultisearchForTest(sc.client)
  273. So(err, ShouldBeNil)
  274. res, err := sc.client.ExecuteMultisearch(ms)
  275. So(err, ShouldBeNil)
  276. Convey("Should send correct request and payload", func() {
  277. So(sc.request, ShouldNotBeNil)
  278. So(sc.request.Method, ShouldEqual, http.MethodPost)
  279. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  280. So(sc.request.URL.RawQuery, ShouldEqual, "max_concurrent_shard_requests=6")
  281. So(sc.requestBody, ShouldNotBeNil)
  282. headerBytes, err := sc.requestBody.ReadBytes('\n')
  283. So(err, ShouldBeNil)
  284. bodyBytes := sc.requestBody.Bytes()
  285. jHeader, err := simplejson.NewJson(headerBytes)
  286. So(err, ShouldBeNil)
  287. jBody, err := simplejson.NewJson(bodyBytes)
  288. So(err, ShouldBeNil)
  289. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  290. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  291. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  292. Convey("and replace $__interval variable", func() {
  293. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  294. })
  295. Convey("and replace $__interval_ms variable", func() {
  296. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  297. })
  298. })
  299. Convey("Should parse response", func() {
  300. So(res.Status, ShouldEqual, 200)
  301. So(res.Responses, ShouldHaveLength, 1)
  302. })
  303. })
  304. })
  305. })
  306. }
  307. func createMultisearchForTest(c Client) (*MultiSearchRequest, error) {
  308. msb := c.MultiSearch()
  309. s := msb.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
  310. s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
  311. a.Interval = "$__interval"
  312. ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) {
  313. a.Settings["script"] = "$__interval_ms*@hostname"
  314. })
  315. })
  316. return msb.Build()
  317. }
  318. type scenarioContext struct {
  319. client Client
  320. request *http.Request
  321. requestBody *bytes.Buffer
  322. responseStatus int
  323. responseBody string
  324. }
  325. type scenarioFunc func(*scenarioContext)
  326. func httpClientScenario(t *testing.T, desc string, ds *models.DataSource, fn scenarioFunc) {
  327. t.Helper()
  328. Convey(desc, func() {
  329. sc := &scenarioContext{
  330. responseStatus: 200,
  331. responseBody: `{ "responses": [] }`,
  332. }
  333. ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  334. sc.request = r
  335. buf, err := ioutil.ReadAll(r.Body)
  336. if err != nil {
  337. t.Fatalf("Failed to read request body, err=%v", err)
  338. }
  339. sc.requestBody = bytes.NewBuffer(buf)
  340. rw.Header().Add("Content-Type", "application/json")
  341. rw.Write([]byte(sc.responseBody))
  342. rw.WriteHeader(sc.responseStatus)
  343. }))
  344. ds.Url = ts.URL
  345. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  346. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  347. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  348. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  349. timeRange := tsdb.NewTimeRange(fromStr, toStr)
  350. c, err := NewClient(context.Background(), ds, timeRange)
  351. So(err, ShouldBeNil)
  352. So(c, ShouldNotBeNil)
  353. sc.client = c
  354. currentNewDatasourceHttpClient := newDatasourceHttpClient
  355. newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
  356. return ts.Client(), nil
  357. }
  358. defer func() {
  359. ts.Close()
  360. newDatasourceHttpClient = currentNewDatasourceHttpClient
  361. }()
  362. fn(sc)
  363. })
  364. }