time_series_query_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. package elasticsearch
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. "github.com/grafana/grafana/pkg/tsdb"
  9. . "github.com/smartystreets/goconvey/convey"
  10. )
  11. func TestExecuteTimeSeriesQuery(t *testing.T) {
  12. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  13. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  14. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  15. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  16. Convey("Test execute time series query", t, func() {
  17. Convey("With defaults on es 2", func() {
  18. c := newFakeClient(2)
  19. _, err := executeTsdbQuery(c, `{
  20. "timeField": "@timestamp",
  21. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
  22. "metrics": [{"type": "count", "id": "0" }]
  23. }`, from, to, 15*time.Second)
  24. So(err, ShouldBeNil)
  25. sr := c.multisearchRequests[0].Requests[0]
  26. rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
  27. So(rangeFilter.Key, ShouldEqual, c.timeField)
  28. So(rangeFilter.Lte, ShouldEqual, toStr)
  29. So(rangeFilter.Gte, ShouldEqual, fromStr)
  30. So(rangeFilter.Format, ShouldEqual, es.DateFormatEpochMS)
  31. So(sr.Aggs[0].Key, ShouldEqual, "2")
  32. dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
  33. So(dateHistogramAgg.Field, ShouldEqual, "@timestamp")
  34. So(dateHistogramAgg.ExtendedBounds.Min, ShouldEqual, fromStr)
  35. So(dateHistogramAgg.ExtendedBounds.Max, ShouldEqual, toStr)
  36. })
  37. Convey("With defaults on es 5", func() {
  38. c := newFakeClient(5)
  39. _, err := executeTsdbQuery(c, `{
  40. "timeField": "@timestamp",
  41. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
  42. "metrics": [{"type": "count", "id": "0" }]
  43. }`, from, to, 15*time.Second)
  44. So(err, ShouldBeNil)
  45. sr := c.multisearchRequests[0].Requests[0]
  46. So(sr.Query.Bool.Filters[0].(*es.RangeFilter).Key, ShouldEqual, c.timeField)
  47. So(sr.Aggs[0].Key, ShouldEqual, "2")
  48. So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Min, ShouldEqual, fromStr)
  49. So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Max, ShouldEqual, toStr)
  50. })
  51. Convey("With multiple bucket aggs", func() {
  52. c := newFakeClient(5)
  53. _, err := executeTsdbQuery(c, `{
  54. "timeField": "@timestamp",
  55. "bucketAggs": [
  56. { "type": "terms", "field": "@host", "id": "2", "settings": { "size": "0", "order": "asc" } },
  57. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  58. ],
  59. "metrics": [{"type": "count", "id": "1" }]
  60. }`, from, to, 15*time.Second)
  61. So(err, ShouldBeNil)
  62. sr := c.multisearchRequests[0].Requests[0]
  63. firstLevel := sr.Aggs[0]
  64. So(firstLevel.Key, ShouldEqual, "2")
  65. termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
  66. So(termsAgg.Field, ShouldEqual, "@host")
  67. So(termsAgg.Size, ShouldEqual, 500)
  68. secondLevel := firstLevel.Aggregation.Aggs[0]
  69. So(secondLevel.Key, ShouldEqual, "3")
  70. So(secondLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  71. })
  72. Convey("With select field", func() {
  73. c := newFakeClient(5)
  74. _, err := executeTsdbQuery(c, `{
  75. "timeField": "@timestamp",
  76. "bucketAggs": [
  77. { "type": "date_histogram", "field": "@timestamp", "id": "2" }
  78. ],
  79. "metrics": [{"type": "avg", "field": "@value", "id": "1" }]
  80. }`, from, to, 15*time.Second)
  81. So(err, ShouldBeNil)
  82. sr := c.multisearchRequests[0].Requests[0]
  83. firstLevel := sr.Aggs[0]
  84. So(firstLevel.Key, ShouldEqual, "2")
  85. So(firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  86. secondLevel := firstLevel.Aggregation.Aggs[0]
  87. So(secondLevel.Key, ShouldEqual, "1")
  88. So(secondLevel.Aggregation.Type, ShouldEqual, "avg")
  89. So(secondLevel.Aggregation.Aggregation.(*es.MetricAggregation).Field, ShouldEqual, "@value")
  90. })
  91. Convey("With term agg and order by metric agg", func() {
  92. c := newFakeClient(5)
  93. _, err := executeTsdbQuery(c, `{
  94. "timeField": "@timestamp",
  95. "bucketAggs": [
  96. {
  97. "type": "terms",
  98. "field": "@host",
  99. "id": "2",
  100. "settings": { "size": "5", "order": "asc", "orderBy": "5" }
  101. },
  102. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  103. ],
  104. "metrics": [
  105. {"type": "count", "id": "1" },
  106. {"type": "avg", "field": "@value", "id": "5" }
  107. ]
  108. }`, from, to, 15*time.Second)
  109. So(err, ShouldBeNil)
  110. sr := c.multisearchRequests[0].Requests[0]
  111. avgAggOrderBy := sr.Aggs[0].Aggregation.Aggs[0]
  112. So(avgAggOrderBy.Key, ShouldEqual, "5")
  113. So(avgAggOrderBy.Aggregation.Type, ShouldEqual, "avg")
  114. avgAgg := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggs[0]
  115. So(avgAgg.Key, ShouldEqual, "5")
  116. So(avgAgg.Aggregation.Type, ShouldEqual, "avg")
  117. })
  118. Convey("With term agg and order by term", func() {
  119. c := newFakeClient(5)
  120. _, err := executeTsdbQuery(c, `{
  121. "timeField": "@timestamp",
  122. "bucketAggs": [
  123. {
  124. "type": "terms",
  125. "field": "@host",
  126. "id": "2",
  127. "settings": { "size": "5", "order": "asc", "orderBy": "_term" }
  128. },
  129. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  130. ],
  131. "metrics": [
  132. {"type": "count", "id": "1" },
  133. {"type": "avg", "field": "@value", "id": "5" }
  134. ]
  135. }`, from, to, 15*time.Second)
  136. So(err, ShouldBeNil)
  137. sr := c.multisearchRequests[0].Requests[0]
  138. firstLevel := sr.Aggs[0]
  139. So(firstLevel.Key, ShouldEqual, "2")
  140. termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
  141. So(termsAgg.Order["_term"], ShouldEqual, "asc")
  142. })
  143. Convey("With term agg and order by term with es6.x", func() {
  144. c := newFakeClient(60)
  145. _, err := executeTsdbQuery(c, `{
  146. "timeField": "@timestamp",
  147. "bucketAggs": [
  148. {
  149. "type": "terms",
  150. "field": "@host",
  151. "id": "2",
  152. "settings": { "size": "5", "order": "asc", "orderBy": "_term" }
  153. },
  154. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  155. ],
  156. "metrics": [
  157. {"type": "count", "id": "1" },
  158. {"type": "avg", "field": "@value", "id": "5" }
  159. ]
  160. }`, from, to, 15*time.Second)
  161. So(err, ShouldBeNil)
  162. sr := c.multisearchRequests[0].Requests[0]
  163. firstLevel := sr.Aggs[0]
  164. So(firstLevel.Key, ShouldEqual, "2")
  165. termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
  166. So(termsAgg.Order["_key"], ShouldEqual, "asc")
  167. })
  168. Convey("With metric percentiles", func() {
  169. c := newFakeClient(5)
  170. _, err := executeTsdbQuery(c, `{
  171. "timeField": "@timestamp",
  172. "bucketAggs": [
  173. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  174. ],
  175. "metrics": [
  176. {
  177. "id": "1",
  178. "type": "percentiles",
  179. "field": "@load_time",
  180. "settings": {
  181. "percents": [ "1", "2", "3", "4" ]
  182. }
  183. }
  184. ]
  185. }`, from, to, 15*time.Second)
  186. So(err, ShouldBeNil)
  187. sr := c.multisearchRequests[0].Requests[0]
  188. percentilesAgg := sr.Aggs[0].Aggregation.Aggs[0]
  189. So(percentilesAgg.Key, ShouldEqual, "1")
  190. So(percentilesAgg.Aggregation.Type, ShouldEqual, "percentiles")
  191. metricAgg := percentilesAgg.Aggregation.Aggregation.(*es.MetricAggregation)
  192. percents := metricAgg.Settings["percents"].([]interface{})
  193. So(percents, ShouldHaveLength, 4)
  194. So(percents[0], ShouldEqual, "1")
  195. So(percents[1], ShouldEqual, "2")
  196. So(percents[2], ShouldEqual, "3")
  197. So(percents[3], ShouldEqual, "4")
  198. })
  199. Convey("With filters aggs on es 2", func() {
  200. c := newFakeClient(2)
  201. _, err := executeTsdbQuery(c, `{
  202. "timeField": "@timestamp",
  203. "bucketAggs": [
  204. {
  205. "id": "2",
  206. "type": "filters",
  207. "settings": {
  208. "filters": [ { "query": "@metric:cpu" }, { "query": "@metric:logins.count" } ]
  209. }
  210. },
  211. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  212. ],
  213. "metrics": [{"type": "count", "id": "1" }]
  214. }`, from, to, 15*time.Second)
  215. So(err, ShouldBeNil)
  216. sr := c.multisearchRequests[0].Requests[0]
  217. filtersAgg := sr.Aggs[0]
  218. So(filtersAgg.Key, ShouldEqual, "2")
  219. So(filtersAgg.Aggregation.Type, ShouldEqual, "filters")
  220. fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
  221. So(fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:cpu")
  222. So(fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:logins.count")
  223. dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
  224. So(dateHistogramAgg.Key, ShouldEqual, "4")
  225. So(dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  226. })
  227. Convey("With filters aggs on es 5", func() {
  228. c := newFakeClient(5)
  229. _, err := executeTsdbQuery(c, `{
  230. "timeField": "@timestamp",
  231. "bucketAggs": [
  232. {
  233. "id": "2",
  234. "type": "filters",
  235. "settings": {
  236. "filters": [ { "query": "@metric:cpu" }, { "query": "@metric:logins.count" } ]
  237. }
  238. },
  239. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  240. ],
  241. "metrics": [{"type": "count", "id": "1" }]
  242. }`, from, to, 15*time.Second)
  243. So(err, ShouldBeNil)
  244. sr := c.multisearchRequests[0].Requests[0]
  245. filtersAgg := sr.Aggs[0]
  246. So(filtersAgg.Key, ShouldEqual, "2")
  247. So(filtersAgg.Aggregation.Type, ShouldEqual, "filters")
  248. fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
  249. So(fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:cpu")
  250. So(fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:logins.count")
  251. dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
  252. So(dateHistogramAgg.Key, ShouldEqual, "4")
  253. So(dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  254. })
  255. Convey("With raw document metric", func() {
  256. c := newFakeClient(5)
  257. _, err := executeTsdbQuery(c, `{
  258. "timeField": "@timestamp",
  259. "bucketAggs": [],
  260. "metrics": [{ "id": "1", "type": "raw_document", "settings": {} }]
  261. }`, from, to, 15*time.Second)
  262. So(err, ShouldBeNil)
  263. sr := c.multisearchRequests[0].Requests[0]
  264. So(sr.Size, ShouldEqual, 500)
  265. })
  266. Convey("With raw document metric size set", func() {
  267. c := newFakeClient(5)
  268. _, err := executeTsdbQuery(c, `{
  269. "timeField": "@timestamp",
  270. "bucketAggs": [],
  271. "metrics": [{ "id": "1", "type": "raw_document", "settings": { "size": 1337 } }]
  272. }`, from, to, 15*time.Second)
  273. So(err, ShouldBeNil)
  274. sr := c.multisearchRequests[0].Requests[0]
  275. So(sr.Size, ShouldEqual, 1337)
  276. })
  277. Convey("With date histogram agg", func() {
  278. c := newFakeClient(5)
  279. _, err := executeTsdbQuery(c, `{
  280. "timeField": "@timestamp",
  281. "bucketAggs": [
  282. {
  283. "id": "2",
  284. "type": "date_histogram",
  285. "field": "@timestamp",
  286. "settings": { "interval": "auto", "min_doc_count": 2 }
  287. }
  288. ],
  289. "metrics": [{"type": "count", "id": "1" }]
  290. }`, from, to, 15*time.Second)
  291. So(err, ShouldBeNil)
  292. sr := c.multisearchRequests[0].Requests[0]
  293. firstLevel := sr.Aggs[0]
  294. So(firstLevel.Key, ShouldEqual, "2")
  295. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  296. hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
  297. So(hAgg.Field, ShouldEqual, "@timestamp")
  298. So(hAgg.Interval, ShouldEqual, "$__interval")
  299. So(hAgg.MinDocCount, ShouldEqual, 2)
  300. })
  301. Convey("With histogram agg", func() {
  302. c := newFakeClient(5)
  303. _, err := executeTsdbQuery(c, `{
  304. "timeField": "@timestamp",
  305. "bucketAggs": [
  306. {
  307. "id": "3",
  308. "type": "histogram",
  309. "field": "bytes",
  310. "settings": { "interval": 10, "min_doc_count": 2, "missing": 5 }
  311. }
  312. ],
  313. "metrics": [{"type": "count", "id": "1" }]
  314. }`, from, to, 15*time.Second)
  315. So(err, ShouldBeNil)
  316. sr := c.multisearchRequests[0].Requests[0]
  317. firstLevel := sr.Aggs[0]
  318. So(firstLevel.Key, ShouldEqual, "3")
  319. So(firstLevel.Aggregation.Type, ShouldEqual, "histogram")
  320. hAgg := firstLevel.Aggregation.Aggregation.(*es.HistogramAgg)
  321. So(hAgg.Field, ShouldEqual, "bytes")
  322. So(hAgg.Interval, ShouldEqual, 10)
  323. So(hAgg.MinDocCount, ShouldEqual, 2)
  324. So(*hAgg.Missing, ShouldEqual, 5)
  325. })
  326. Convey("With geo hash grid agg", func() {
  327. c := newFakeClient(5)
  328. _, err := executeTsdbQuery(c, `{
  329. "timeField": "@timestamp",
  330. "bucketAggs": [
  331. {
  332. "id": "3",
  333. "type": "geohash_grid",
  334. "field": "@location",
  335. "settings": { "precision": 3 }
  336. }
  337. ],
  338. "metrics": [{"type": "count", "id": "1" }]
  339. }`, from, to, 15*time.Second)
  340. So(err, ShouldBeNil)
  341. sr := c.multisearchRequests[0].Requests[0]
  342. firstLevel := sr.Aggs[0]
  343. So(firstLevel.Key, ShouldEqual, "3")
  344. So(firstLevel.Aggregation.Type, ShouldEqual, "geohash_grid")
  345. ghGridAgg := firstLevel.Aggregation.Aggregation.(*es.GeoHashGridAggregation)
  346. So(ghGridAgg.Field, ShouldEqual, "@location")
  347. So(ghGridAgg.Precision, ShouldEqual, 3)
  348. })
  349. Convey("With moving average", func() {
  350. c := newFakeClient(5)
  351. _, err := executeTsdbQuery(c, `{
  352. "timeField": "@timestamp",
  353. "bucketAggs": [
  354. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  355. ],
  356. "metrics": [
  357. { "id": "3", "type": "sum", "field": "@value" },
  358. {
  359. "id": "2",
  360. "type": "moving_avg",
  361. "field": "3",
  362. "pipelineAgg": "3"
  363. }
  364. ]
  365. }`, from, to, 15*time.Second)
  366. So(err, ShouldBeNil)
  367. sr := c.multisearchRequests[0].Requests[0]
  368. firstLevel := sr.Aggs[0]
  369. So(firstLevel.Key, ShouldEqual, "4")
  370. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  371. So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 2)
  372. sumAgg := firstLevel.Aggregation.Aggs[0]
  373. So(sumAgg.Key, ShouldEqual, "3")
  374. So(sumAgg.Aggregation.Type, ShouldEqual, "sum")
  375. mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation)
  376. So(mAgg.Field, ShouldEqual, "@value")
  377. movingAvgAgg := firstLevel.Aggregation.Aggs[1]
  378. So(movingAvgAgg.Key, ShouldEqual, "2")
  379. So(movingAvgAgg.Aggregation.Type, ShouldEqual, "moving_avg")
  380. pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  381. So(pl.BucketPath, ShouldEqual, "3")
  382. })
  383. Convey("With broken moving average", func() {
  384. c := newFakeClient(5)
  385. _, err := executeTsdbQuery(c, `{
  386. "timeField": "@timestamp",
  387. "bucketAggs": [
  388. { "type": "date_histogram", "field": "@timestamp", "id": "5" }
  389. ],
  390. "metrics": [
  391. { "id": "3", "type": "sum", "field": "@value" },
  392. {
  393. "id": "2",
  394. "type": "moving_avg",
  395. "pipelineAgg": "3"
  396. },
  397. {
  398. "id": "4",
  399. "type": "moving_avg",
  400. "pipelineAgg": "Metric to apply moving average"
  401. }
  402. ]
  403. }`, from, to, 15*time.Second)
  404. So(err, ShouldBeNil)
  405. sr := c.multisearchRequests[0].Requests[0]
  406. firstLevel := sr.Aggs[0]
  407. So(firstLevel.Key, ShouldEqual, "5")
  408. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  409. So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 2)
  410. movingAvgAgg := firstLevel.Aggregation.Aggs[1]
  411. So(movingAvgAgg.Key, ShouldEqual, "2")
  412. plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  413. So(plAgg.BucketPath, ShouldEqual, "3")
  414. })
  415. Convey("With derivative", func() {
  416. c := newFakeClient(5)
  417. _, err := executeTsdbQuery(c, `{
  418. "timeField": "@timestamp",
  419. "bucketAggs": [
  420. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  421. ],
  422. "metrics": [
  423. { "id": "3", "type": "sum", "field": "@value" },
  424. {
  425. "id": "2",
  426. "type": "derivative",
  427. "pipelineAgg": "3"
  428. }
  429. ]
  430. }`, from, to, 15*time.Second)
  431. So(err, ShouldBeNil)
  432. sr := c.multisearchRequests[0].Requests[0]
  433. firstLevel := sr.Aggs[0]
  434. So(firstLevel.Key, ShouldEqual, "4")
  435. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  436. derivativeAgg := firstLevel.Aggregation.Aggs[1]
  437. So(derivativeAgg.Key, ShouldEqual, "2")
  438. plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  439. So(plAgg.BucketPath, ShouldEqual, "3")
  440. })
  441. })
  442. }
  443. type fakeClient struct {
  444. version int
  445. timeField string
  446. multiSearchResponse *es.MultiSearchResponse
  447. multiSearchError error
  448. builder *es.MultiSearchRequestBuilder
  449. multisearchRequests []*es.MultiSearchRequest
  450. }
  451. func newFakeClient(version int) *fakeClient {
  452. return &fakeClient{
  453. version: version,
  454. timeField: "@timestamp",
  455. multisearchRequests: make([]*es.MultiSearchRequest, 0),
  456. multiSearchResponse: &es.MultiSearchResponse{},
  457. }
  458. }
  459. func (c *fakeClient) GetVersion() int {
  460. return c.version
  461. }
  462. func (c *fakeClient) GetTimeField() string {
  463. return c.timeField
  464. }
  465. func (c *fakeClient) GetMinInterval(queryInterval string) (time.Duration, error) {
  466. return 15 * time.Second, nil
  467. }
  468. func (c *fakeClient) ExecuteMultisearch(r *es.MultiSearchRequest) (*es.MultiSearchResponse, error) {
  469. c.multisearchRequests = append(c.multisearchRequests, r)
  470. return c.multiSearchResponse, c.multiSearchError
  471. }
  472. func (c *fakeClient) MultiSearch() *es.MultiSearchRequestBuilder {
  473. c.builder = es.NewMultiSearchRequestBuilder(c.version)
  474. return c.builder
  475. }
  476. func newTsdbQuery(body string) (*tsdb.TsdbQuery, error) {
  477. json, err := simplejson.NewJson([]byte(body))
  478. if err != nil {
  479. return nil, err
  480. }
  481. return &tsdb.TsdbQuery{
  482. Queries: []*tsdb.Query{
  483. {
  484. Model: json,
  485. },
  486. },
  487. }, nil
  488. }
  489. func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval time.Duration) (*tsdb.Response, error) {
  490. json, err := simplejson.NewJson([]byte(body))
  491. if err != nil {
  492. return nil, err
  493. }
  494. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  495. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  496. tsdbQuery := &tsdb.TsdbQuery{
  497. Queries: []*tsdb.Query{
  498. {
  499. Model: json,
  500. },
  501. },
  502. TimeRange: tsdb.NewTimeRange(fromStr, toStr),
  503. }
  504. query := newTimeSeriesQuery(c, tsdbQuery, tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: minInterval}))
  505. return query.execute()
  506. }
  507. func TestTimeSeriesQueryParser(t *testing.T) {
  508. Convey("Test time series query parser", t, func() {
  509. p := newTimeSeriesQueryParser()
  510. Convey("Should be able to parse query", func() {
  511. body := `{
  512. "timeField": "@timestamp",
  513. "query": "@metric:cpu",
  514. "alias": "{{@hostname}} {{metric}}",
  515. "metrics": [
  516. {
  517. "field": "@value",
  518. "id": "1",
  519. "meta": {},
  520. "settings": {
  521. "percents": [
  522. "90"
  523. ]
  524. },
  525. "type": "percentiles"
  526. },
  527. {
  528. "type": "count",
  529. "field": "select field",
  530. "id": "4",
  531. "settings": {},
  532. "meta": {}
  533. }
  534. ],
  535. "bucketAggs": [
  536. {
  537. "fake": true,
  538. "field": "@hostname",
  539. "id": "3",
  540. "settings": {
  541. "min_doc_count": 1,
  542. "order": "desc",
  543. "orderBy": "_term",
  544. "size": "10"
  545. },
  546. "type": "terms"
  547. },
  548. {
  549. "field": "@timestamp",
  550. "id": "2",
  551. "settings": {
  552. "interval": "5m",
  553. "min_doc_count": 0,
  554. "trimEdges": 0
  555. },
  556. "type": "date_histogram"
  557. }
  558. ]
  559. }`
  560. tsdbQuery, err := newTsdbQuery(body)
  561. So(err, ShouldBeNil)
  562. queries, err := p.parse(tsdbQuery)
  563. So(err, ShouldBeNil)
  564. So(queries, ShouldHaveLength, 1)
  565. q := queries[0]
  566. So(q.TimeField, ShouldEqual, "@timestamp")
  567. So(q.RawQuery, ShouldEqual, "@metric:cpu")
  568. So(q.Alias, ShouldEqual, "{{@hostname}} {{metric}}")
  569. So(q.Metrics, ShouldHaveLength, 2)
  570. So(q.Metrics[0].Field, ShouldEqual, "@value")
  571. So(q.Metrics[0].ID, ShouldEqual, "1")
  572. So(q.Metrics[0].Type, ShouldEqual, "percentiles")
  573. So(q.Metrics[0].Hide, ShouldBeFalse)
  574. So(q.Metrics[0].PipelineAggregate, ShouldEqual, "")
  575. So(q.Metrics[0].Settings.Get("percents").MustStringArray()[0], ShouldEqual, "90")
  576. So(q.Metrics[1].Field, ShouldEqual, "select field")
  577. So(q.Metrics[1].ID, ShouldEqual, "4")
  578. So(q.Metrics[1].Type, ShouldEqual, "count")
  579. So(q.Metrics[1].Hide, ShouldBeFalse)
  580. So(q.Metrics[1].PipelineAggregate, ShouldEqual, "")
  581. So(q.Metrics[1].Settings.MustMap(), ShouldBeEmpty)
  582. So(q.BucketAggs, ShouldHaveLength, 2)
  583. So(q.BucketAggs[0].Field, ShouldEqual, "@hostname")
  584. So(q.BucketAggs[0].ID, ShouldEqual, "3")
  585. So(q.BucketAggs[0].Type, ShouldEqual, "terms")
  586. So(q.BucketAggs[0].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 1)
  587. So(q.BucketAggs[0].Settings.Get("order").MustString(), ShouldEqual, "desc")
  588. So(q.BucketAggs[0].Settings.Get("orderBy").MustString(), ShouldEqual, "_term")
  589. So(q.BucketAggs[0].Settings.Get("size").MustString(), ShouldEqual, "10")
  590. So(q.BucketAggs[1].Field, ShouldEqual, "@timestamp")
  591. So(q.BucketAggs[1].ID, ShouldEqual, "2")
  592. So(q.BucketAggs[1].Type, ShouldEqual, "date_histogram")
  593. So(q.BucketAggs[1].Settings.Get("interval").MustString(), ShouldEqual, "5m")
  594. So(q.BucketAggs[1].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 0)
  595. So(q.BucketAggs[1].Settings.Get("trimEdges").MustInt64(), ShouldEqual, 0)
  596. })
  597. })
  598. }