client_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/http/httptest"
  9. "testing"
  10. "time"
  11. "github.com/grafana/grafana/pkg/components/simplejson"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "github.com/grafana/grafana/pkg/models"
  14. . "github.com/smartystreets/goconvey/convey"
  15. )
  16. func TestClient(t *testing.T) {
  17. Convey("Test elasticsearch client", t, func() {
  18. Convey("NewClient", func() {
  19. Convey("When no version set should return error", func() {
  20. ds := &models.DataSource{
  21. JsonData: simplejson.NewFromAny(make(map[string]interface{})),
  22. }
  23. _, err := NewClient(context.Background(), ds, nil)
  24. So(err, ShouldNotBeNil)
  25. })
  26. Convey("When no time field name set should return error", func() {
  27. ds := &models.DataSource{
  28. JsonData: simplejson.NewFromAny(map[string]interface{}{
  29. "esVersion": 5,
  30. }),
  31. }
  32. _, err := NewClient(context.Background(), ds, nil)
  33. So(err, ShouldNotBeNil)
  34. })
  35. Convey("When unsupported version set should return error", func() {
  36. ds := &models.DataSource{
  37. JsonData: simplejson.NewFromAny(map[string]interface{}{
  38. "esVersion": 6,
  39. "timeField": "@timestamp",
  40. }),
  41. }
  42. _, err := NewClient(context.Background(), ds, nil)
  43. So(err, ShouldNotBeNil)
  44. })
  45. Convey("When version 2 should return v2 client", func() {
  46. ds := &models.DataSource{
  47. JsonData: simplejson.NewFromAny(map[string]interface{}{
  48. "esVersion": 2,
  49. "timeField": "@timestamp",
  50. }),
  51. }
  52. c, err := NewClient(context.Background(), ds, nil)
  53. So(err, ShouldBeNil)
  54. So(c.GetVersion(), ShouldEqual, 2)
  55. })
  56. Convey("When version 5 should return v5 client", func() {
  57. ds := &models.DataSource{
  58. JsonData: simplejson.NewFromAny(map[string]interface{}{
  59. "esVersion": 5,
  60. "timeField": "@timestamp",
  61. }),
  62. }
  63. c, err := NewClient(context.Background(), ds, nil)
  64. So(err, ShouldBeNil)
  65. So(c.GetVersion(), ShouldEqual, 5)
  66. })
  67. Convey("When version 56 should return v5.6 client", func() {
  68. ds := &models.DataSource{
  69. JsonData: simplejson.NewFromAny(map[string]interface{}{
  70. "esVersion": 56,
  71. "timeField": "@timestamp",
  72. }),
  73. }
  74. c, err := NewClient(context.Background(), ds, nil)
  75. So(err, ShouldBeNil)
  76. So(c.GetVersion(), ShouldEqual, 56)
  77. })
  78. Convey("When version 60 should return v6.0 client", func() {
  79. ds := &models.DataSource{
  80. JsonData: simplejson.NewFromAny(map[string]interface{}{
  81. "esVersion": 60,
  82. "timeField": "@timestamp",
  83. }),
  84. }
  85. c, err := NewClient(context.Background(), ds, nil)
  86. So(err, ShouldBeNil)
  87. So(c.GetVersion(), ShouldEqual, 60)
  88. })
  89. Convey("When version 70 should return v7.0 client", func() {
  90. ds := &models.DataSource{
  91. JsonData: simplejson.NewFromAny(map[string]interface{}{
  92. "esVersion": 70,
  93. "timeField": "@timestamp",
  94. }),
  95. }
  96. c, err := NewClient(context.Background(), ds, nil)
  97. So(err, ShouldBeNil)
  98. So(c.GetVersion(), ShouldEqual, 70)
  99. })
  100. })
  101. httpClientScenario(t, "Given a fake http client and a v2.x client with response", &models.DataSource{
  102. Database: "[metrics-]YYYY.MM.DD",
  103. JsonData: simplejson.NewFromAny(map[string]interface{}{
  104. "esVersion": 2,
  105. "timeField": "@timestamp",
  106. "interval": "Daily",
  107. }),
  108. }, func(sc *scenarioContext) {
  109. sc.responseBody = `{
  110. "responses": [
  111. {
  112. "hits": { "hits": [], "max_score": 0, "total": 4656 },
  113. "status": 200
  114. }
  115. ]
  116. }`
  117. Convey("When executing multi search", func() {
  118. ms, err := createMultisearchForTest(sc.client)
  119. So(err, ShouldBeNil)
  120. res, err := sc.client.ExecuteMultisearch(ms)
  121. So(err, ShouldBeNil)
  122. Convey("Should send correct request and payload", func() {
  123. So(sc.request, ShouldNotBeNil)
  124. So(sc.request.Method, ShouldEqual, http.MethodPost)
  125. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  126. So(sc.requestBody, ShouldNotBeNil)
  127. headerBytes, err := sc.requestBody.ReadBytes('\n')
  128. So(err, ShouldBeNil)
  129. bodyBytes := sc.requestBody.Bytes()
  130. jHeader, err := simplejson.NewJson(headerBytes)
  131. So(err, ShouldBeNil)
  132. jBody, err := simplejson.NewJson(bodyBytes)
  133. So(err, ShouldBeNil)
  134. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  135. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  136. So(jHeader.Get("search_type").MustString(), ShouldEqual, "count")
  137. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  138. Convey("and replace $__interval variable", func() {
  139. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  140. })
  141. Convey("and replace $__interval_ms variable", func() {
  142. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  143. })
  144. })
  145. Convey("Should parse response", func() {
  146. So(res.Status, ShouldEqual, 200)
  147. So(res.Responses, ShouldHaveLength, 1)
  148. })
  149. })
  150. })
  151. httpClientScenario(t, "Given a fake http client and a v5.x client with response", &models.DataSource{
  152. Database: "[metrics-]YYYY.MM.DD",
  153. JsonData: simplejson.NewFromAny(map[string]interface{}{
  154. "esVersion": 5,
  155. "maxConcurrentShardRequests": 100,
  156. "timeField": "@timestamp",
  157. "interval": "Daily",
  158. }),
  159. }, func(sc *scenarioContext) {
  160. sc.responseBody = `{
  161. "responses": [
  162. {
  163. "hits": { "hits": [], "max_score": 0, "total": 4656 },
  164. "status": 200
  165. }
  166. ]
  167. }`
  168. Convey("When executing multi search", func() {
  169. ms, err := createMultisearchForTest(sc.client)
  170. So(err, ShouldBeNil)
  171. res, err := sc.client.ExecuteMultisearch(ms)
  172. So(err, ShouldBeNil)
  173. Convey("Should send correct request and payload", func() {
  174. So(sc.request, ShouldNotBeNil)
  175. So(sc.request.Method, ShouldEqual, http.MethodPost)
  176. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  177. So(sc.requestBody, ShouldNotBeNil)
  178. headerBytes, err := sc.requestBody.ReadBytes('\n')
  179. So(err, ShouldBeNil)
  180. bodyBytes := sc.requestBody.Bytes()
  181. jHeader, err := simplejson.NewJson(headerBytes)
  182. So(err, ShouldBeNil)
  183. jBody, err := simplejson.NewJson(bodyBytes)
  184. So(err, ShouldBeNil)
  185. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  186. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  187. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  188. So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
  189. Convey("and replace $__interval variable", func() {
  190. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  191. })
  192. Convey("and replace $__interval_ms variable", func() {
  193. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  194. })
  195. })
  196. Convey("Should parse response", func() {
  197. So(res.Status, ShouldEqual, 200)
  198. So(res.Responses, ShouldHaveLength, 1)
  199. })
  200. })
  201. })
  202. httpClientScenario(t, "Given a fake http client and a v5.6 client with response", &models.DataSource{
  203. Database: "[metrics-]YYYY.MM.DD",
  204. JsonData: simplejson.NewFromAny(map[string]interface{}{
  205. "esVersion": 56,
  206. "maxConcurrentShardRequests": 100,
  207. "timeField": "@timestamp",
  208. "interval": "Daily",
  209. }),
  210. }, func(sc *scenarioContext) {
  211. sc.responseBody = `{
  212. "responses": [
  213. {
  214. "hits": { "hits": [], "max_score": 0, "total": 4656 },
  215. "status": 200
  216. }
  217. ]
  218. }`
  219. Convey("When executing multi search", func() {
  220. ms, err := createMultisearchForTest(sc.client)
  221. So(err, ShouldBeNil)
  222. res, err := sc.client.ExecuteMultisearch(ms)
  223. So(err, ShouldBeNil)
  224. Convey("Should send correct request and payload", func() {
  225. So(sc.request, ShouldNotBeNil)
  226. So(sc.request.Method, ShouldEqual, http.MethodPost)
  227. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  228. So(sc.requestBody, ShouldNotBeNil)
  229. headerBytes, err := sc.requestBody.ReadBytes('\n')
  230. So(err, ShouldBeNil)
  231. bodyBytes := sc.requestBody.Bytes()
  232. jHeader, err := simplejson.NewJson(headerBytes)
  233. So(err, ShouldBeNil)
  234. jBody, err := simplejson.NewJson(bodyBytes)
  235. So(err, ShouldBeNil)
  236. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  237. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  238. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  239. So(jHeader.Get("max_concurrent_shard_requests").MustInt(), ShouldEqual, 100)
  240. Convey("and replace $__interval variable", func() {
  241. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  242. })
  243. Convey("and replace $__interval_ms variable", func() {
  244. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  245. })
  246. })
  247. Convey("Should parse response", func() {
  248. So(res.Status, ShouldEqual, 200)
  249. So(res.Responses, ShouldHaveLength, 1)
  250. })
  251. })
  252. })
  253. httpClientScenario(t, "Given a fake http client and a v7.0 client with response", &models.DataSource{
  254. Database: "[metrics-]YYYY.MM.DD",
  255. JsonData: simplejson.NewFromAny(map[string]interface{}{
  256. "esVersion": 70,
  257. "maxConcurrentShardRequests": 6,
  258. "timeField": "@timestamp",
  259. "interval": "Daily",
  260. }),
  261. }, func(sc *scenarioContext) {
  262. sc.responseBody = `{
  263. "responses": [
  264. {
  265. "hits": { "hits": [], "max_score": 0, "total": { "value": 4656, "relation": "eq"} },
  266. "status": 200
  267. }
  268. ]
  269. }`
  270. Convey("When executing multi search", func() {
  271. ms, err := createMultisearchForTest(sc.client)
  272. So(err, ShouldBeNil)
  273. res, err := sc.client.ExecuteMultisearch(ms)
  274. So(err, ShouldBeNil)
  275. Convey("Should send correct request and payload", func() {
  276. So(sc.request, ShouldNotBeNil)
  277. So(sc.request.Method, ShouldEqual, http.MethodPost)
  278. So(sc.request.URL.Path, ShouldEqual, "/_msearch")
  279. So(sc.request.URL.RawQuery, ShouldEqual, "max_concurrent_shard_requests=6")
  280. So(sc.requestBody, ShouldNotBeNil)
  281. headerBytes, err := sc.requestBody.ReadBytes('\n')
  282. So(err, ShouldBeNil)
  283. bodyBytes := sc.requestBody.Bytes()
  284. jHeader, err := simplejson.NewJson(headerBytes)
  285. So(err, ShouldBeNil)
  286. jBody, err := simplejson.NewJson(bodyBytes)
  287. So(err, ShouldBeNil)
  288. So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
  289. So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
  290. So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
  291. Convey("and replace $__interval variable", func() {
  292. So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
  293. })
  294. Convey("and replace $__interval_ms variable", func() {
  295. So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
  296. })
  297. })
  298. Convey("Should parse response", func() {
  299. So(res.Status, ShouldEqual, 200)
  300. So(res.Responses, ShouldHaveLength, 1)
  301. })
  302. })
  303. })
  304. })
  305. }
  306. func createMultisearchForTest(c Client) (*MultiSearchRequest, error) {
  307. msb := c.MultiSearch()
  308. s := msb.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
  309. s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
  310. a.Interval = "$__interval"
  311. ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) {
  312. a.Settings["script"] = "$__interval_ms*@hostname"
  313. })
  314. })
  315. return msb.Build()
  316. }
  317. type scenarioContext struct {
  318. client Client
  319. request *http.Request
  320. requestBody *bytes.Buffer
  321. responseStatus int
  322. responseBody string
  323. }
  324. type scenarioFunc func(*scenarioContext)
  325. func httpClientScenario(t *testing.T, desc string, ds *models.DataSource, fn scenarioFunc) {
  326. t.Helper()
  327. Convey(desc, func() {
  328. sc := &scenarioContext{
  329. responseStatus: 200,
  330. responseBody: `{ "responses": [] }`,
  331. }
  332. ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  333. sc.request = r
  334. buf, err := ioutil.ReadAll(r.Body)
  335. if err != nil {
  336. t.Fatalf("Failed to read request body, err=%v", err)
  337. }
  338. sc.requestBody = bytes.NewBuffer(buf)
  339. rw.Header().Add("Content-Type", "application/json")
  340. rw.Write([]byte(sc.responseBody))
  341. rw.WriteHeader(sc.responseStatus)
  342. }))
  343. ds.Url = ts.URL
  344. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  345. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  346. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  347. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  348. timeRange := tsdb.NewTimeRange(fromStr, toStr)
  349. c, err := NewClient(context.Background(), ds, timeRange)
  350. So(err, ShouldBeNil)
  351. So(c, ShouldNotBeNil)
  352. sc.client = c
  353. currentNewDatasourceHttpClient := newDatasourceHttpClient
  354. newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
  355. return ts.Client(), nil
  356. }
  357. defer func() {
  358. ts.Close()
  359. newDatasourceHttpClient = currentNewDatasourceHttpClient
  360. }()
  361. fn(sc)
  362. })
  363. }