time_series_query_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. package elasticsearch
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. "github.com/grafana/grafana/pkg/tsdb"
  9. . "github.com/smartystreets/goconvey/convey"
  10. )
  11. func TestExecuteTimeSeriesQuery(t *testing.T) {
  12. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  13. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  14. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  15. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  16. Convey("Test execute time series query", t, func() {
  17. Convey("With defaults on es 2", func() {
  18. c := newFakeClient(2)
  19. _, err := executeTsdbQuery(c, `{
  20. "timeField": "@timestamp",
  21. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
  22. "metrics": [{"type": "count", "id": "0" }]
  23. }`, from, to, 15*time.Second)
  24. So(err, ShouldBeNil)
  25. sr := c.multisearchRequests[0].Requests[0]
  26. rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
  27. So(rangeFilter.Key, ShouldEqual, c.timeField)
  28. So(rangeFilter.Lte, ShouldEqual, toStr)
  29. So(rangeFilter.Gte, ShouldEqual, fromStr)
  30. So(rangeFilter.Format, ShouldEqual, es.DateFormatEpochMS)
  31. So(sr.Aggs[0].Key, ShouldEqual, "2")
  32. dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
  33. So(dateHistogramAgg.Field, ShouldEqual, "@timestamp")
  34. So(dateHistogramAgg.ExtendedBounds.Min, ShouldEqual, fromStr)
  35. So(dateHistogramAgg.ExtendedBounds.Max, ShouldEqual, toStr)
  36. })
  37. Convey("With defaults on es 5", func() {
  38. c := newFakeClient(5)
  39. _, err := executeTsdbQuery(c, `{
  40. "timeField": "@timestamp",
  41. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
  42. "metrics": [{"type": "count", "id": "0" }]
  43. }`, from, to, 15*time.Second)
  44. So(err, ShouldBeNil)
  45. sr := c.multisearchRequests[0].Requests[0]
  46. So(sr.Query.Bool.Filters[0].(*es.RangeFilter).Key, ShouldEqual, c.timeField)
  47. So(sr.Aggs[0].Key, ShouldEqual, "2")
  48. So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Min, ShouldEqual, fromStr)
  49. So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Max, ShouldEqual, toStr)
  50. })
  51. Convey("With multiple bucket aggs", func() {
  52. c := newFakeClient(5)
  53. _, err := executeTsdbQuery(c, `{
  54. "timeField": "@timestamp",
  55. "bucketAggs": [
  56. { "type": "terms", "field": "@host", "id": "2", "settings": { "size": "0", "order": "asc" } },
  57. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  58. ],
  59. "metrics": [{"type": "count", "id": "1" }]
  60. }`, from, to, 15*time.Second)
  61. So(err, ShouldBeNil)
  62. sr := c.multisearchRequests[0].Requests[0]
  63. firstLevel := sr.Aggs[0]
  64. So(firstLevel.Key, ShouldEqual, "2")
  65. termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
  66. So(termsAgg.Field, ShouldEqual, "@host")
  67. So(termsAgg.Size, ShouldEqual, 500)
  68. secondLevel := firstLevel.Aggregation.Aggs[0]
  69. So(secondLevel.Key, ShouldEqual, "3")
  70. So(secondLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  71. })
  72. Convey("With select field", func() {
  73. c := newFakeClient(5)
  74. _, err := executeTsdbQuery(c, `{
  75. "timeField": "@timestamp",
  76. "bucketAggs": [
  77. { "type": "date_histogram", "field": "@timestamp", "id": "2" }
  78. ],
  79. "metrics": [{"type": "avg", "field": "@value", "id": "1" }]
  80. }`, from, to, 15*time.Second)
  81. So(err, ShouldBeNil)
  82. sr := c.multisearchRequests[0].Requests[0]
  83. firstLevel := sr.Aggs[0]
  84. So(firstLevel.Key, ShouldEqual, "2")
  85. So(firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  86. secondLevel := firstLevel.Aggregation.Aggs[0]
  87. So(secondLevel.Key, ShouldEqual, "1")
  88. So(secondLevel.Aggregation.Type, ShouldEqual, "avg")
  89. So(secondLevel.Aggregation.Aggregation.(*es.MetricAggregation).Field, ShouldEqual, "@value")
  90. })
  91. Convey("With term agg and order by metric agg", func() {
  92. c := newFakeClient(5)
  93. _, err := executeTsdbQuery(c, `{
  94. "timeField": "@timestamp",
  95. "bucketAggs": [
  96. {
  97. "type": "terms",
  98. "field": "@host",
  99. "id": "2",
  100. "settings": { "size": "5", "order": "asc", "orderBy": "5" }
  101. },
  102. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  103. ],
  104. "metrics": [
  105. {"type": "count", "id": "1" },
  106. {"type": "avg", "field": "@value", "id": "5" }
  107. ]
  108. }`, from, to, 15*time.Second)
  109. So(err, ShouldBeNil)
  110. sr := c.multisearchRequests[0].Requests[0]
  111. avgAggOrderBy := sr.Aggs[0].Aggregation.Aggs[0]
  112. So(avgAggOrderBy.Key, ShouldEqual, "5")
  113. So(avgAggOrderBy.Aggregation.Type, ShouldEqual, "avg")
  114. avgAgg := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggs[0]
  115. So(avgAgg.Key, ShouldEqual, "5")
  116. So(avgAgg.Aggregation.Type, ShouldEqual, "avg")
  117. })
  118. Convey("With term agg and order by term", func() {
  119. c := newFakeClient(5)
  120. _, err := executeTsdbQuery(c, `{
  121. "timeField": "@timestamp",
  122. "bucketAggs": [
  123. {
  124. "type": "terms",
  125. "field": "@host",
  126. "id": "2",
  127. "settings": { "size": "5", "order": "asc", "orderBy": "_term" }
  128. },
  129. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  130. ],
  131. "metrics": [
  132. {"type": "count", "id": "1" },
  133. {"type": "avg", "field": "@value", "id": "5" }
  134. ]
  135. }`, from, to, 15*time.Second)
  136. So(err, ShouldBeNil)
  137. sr := c.multisearchRequests[0].Requests[0]
  138. firstLevel := sr.Aggs[0]
  139. So(firstLevel.Key, ShouldEqual, "2")
  140. termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
  141. So(termsAgg.Order["_term"], ShouldEqual, "asc")
  142. })
  143. Convey("With term agg and order by term with es6.x", func() {
  144. c := newFakeClient(60)
  145. _, err := executeTsdbQuery(c, `{
  146. "timeField": "@timestamp",
  147. "bucketAggs": [
  148. {
  149. "type": "terms",
  150. "field": "@host",
  151. "id": "2",
  152. "settings": { "size": "5", "order": "asc", "orderBy": "_term" }
  153. },
  154. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  155. ],
  156. "metrics": [
  157. {"type": "count", "id": "1" },
  158. {"type": "avg", "field": "@value", "id": "5" }
  159. ]
  160. }`, from, to, 15*time.Second)
  161. So(err, ShouldBeNil)
  162. sr := c.multisearchRequests[0].Requests[0]
  163. firstLevel := sr.Aggs[0]
  164. So(firstLevel.Key, ShouldEqual, "2")
  165. termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
  166. So(termsAgg.Order["_key"], ShouldEqual, "asc")
  167. })
  168. Convey("With metric percentiles", func() {
  169. c := newFakeClient(5)
  170. _, err := executeTsdbQuery(c, `{
  171. "timeField": "@timestamp",
  172. "bucketAggs": [
  173. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  174. ],
  175. "metrics": [
  176. {
  177. "id": "1",
  178. "type": "percentiles",
  179. "field": "@load_time",
  180. "settings": {
  181. "percents": [ "1", "2", "3", "4" ]
  182. }
  183. }
  184. ]
  185. }`, from, to, 15*time.Second)
  186. So(err, ShouldBeNil)
  187. sr := c.multisearchRequests[0].Requests[0]
  188. percentilesAgg := sr.Aggs[0].Aggregation.Aggs[0]
  189. So(percentilesAgg.Key, ShouldEqual, "1")
  190. So(percentilesAgg.Aggregation.Type, ShouldEqual, "percentiles")
  191. metricAgg := percentilesAgg.Aggregation.Aggregation.(*es.MetricAggregation)
  192. percents := metricAgg.Settings["percents"].([]interface{})
  193. So(percents, ShouldHaveLength, 4)
  194. So(percents[0], ShouldEqual, "1")
  195. So(percents[1], ShouldEqual, "2")
  196. So(percents[2], ShouldEqual, "3")
  197. So(percents[3], ShouldEqual, "4")
  198. })
  199. Convey("With filters aggs on es 2", func() {
  200. c := newFakeClient(2)
  201. _, err := executeTsdbQuery(c, `{
  202. "timeField": "@timestamp",
  203. "bucketAggs": [
  204. {
  205. "id": "2",
  206. "type": "filters",
  207. "settings": {
  208. "filters": [ { "query": "@metric:cpu" }, { "query": "@metric:logins.count" } ]
  209. }
  210. },
  211. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  212. ],
  213. "metrics": [{"type": "count", "id": "1" }]
  214. }`, from, to, 15*time.Second)
  215. So(err, ShouldBeNil)
  216. sr := c.multisearchRequests[0].Requests[0]
  217. filtersAgg := sr.Aggs[0]
  218. So(filtersAgg.Key, ShouldEqual, "2")
  219. So(filtersAgg.Aggregation.Type, ShouldEqual, "filters")
  220. fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
  221. So(fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:cpu")
  222. So(fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:logins.count")
  223. dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
  224. So(dateHistogramAgg.Key, ShouldEqual, "4")
  225. So(dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  226. })
  227. Convey("With filters aggs on es 5", func() {
  228. c := newFakeClient(5)
  229. _, err := executeTsdbQuery(c, `{
  230. "timeField": "@timestamp",
  231. "bucketAggs": [
  232. {
  233. "id": "2",
  234. "type": "filters",
  235. "settings": {
  236. "filters": [ { "query": "@metric:cpu" }, { "query": "@metric:logins.count" } ]
  237. }
  238. },
  239. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  240. ],
  241. "metrics": [{"type": "count", "id": "1" }]
  242. }`, from, to, 15*time.Second)
  243. So(err, ShouldBeNil)
  244. sr := c.multisearchRequests[0].Requests[0]
  245. filtersAgg := sr.Aggs[0]
  246. So(filtersAgg.Key, ShouldEqual, "2")
  247. So(filtersAgg.Aggregation.Type, ShouldEqual, "filters")
  248. fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
  249. So(fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:cpu")
  250. So(fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, ShouldEqual, "@metric:logins.count")
  251. dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
  252. So(dateHistogramAgg.Key, ShouldEqual, "4")
  253. So(dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, ShouldEqual, "@timestamp")
  254. })
  255. Convey("With raw document metric", func() {
  256. c := newFakeClient(5)
  257. _, err := executeTsdbQuery(c, `{
  258. "timeField": "@timestamp",
  259. "bucketAggs": [],
  260. "metrics": [{ "id": "1", "type": "raw_document", "settings": {} }]
  261. }`, from, to, 15*time.Second)
  262. So(err, ShouldBeNil)
  263. sr := c.multisearchRequests[0].Requests[0]
  264. So(sr.Size, ShouldEqual, 500)
  265. })
  266. Convey("With raw document metric size set", func() {
  267. c := newFakeClient(5)
  268. _, err := executeTsdbQuery(c, `{
  269. "timeField": "@timestamp",
  270. "bucketAggs": [],
  271. "metrics": [{ "id": "1", "type": "raw_document", "settings": { "size": 1337 } }]
  272. }`, from, to, 15*time.Second)
  273. So(err, ShouldBeNil)
  274. sr := c.multisearchRequests[0].Requests[0]
  275. So(sr.Size, ShouldEqual, 1337)
  276. })
  277. Convey("With date histogram agg", func() {
  278. c := newFakeClient(5)
  279. _, err := executeTsdbQuery(c, `{
  280. "timeField": "@timestamp",
  281. "bucketAggs": [
  282. {
  283. "id": "2",
  284. "type": "date_histogram",
  285. "field": "@timestamp",
  286. "settings": { "interval": "auto", "min_doc_count": 2 }
  287. }
  288. ],
  289. "metrics": [{"type": "count", "id": "1" }]
  290. }`, from, to, 15*time.Second)
  291. So(err, ShouldBeNil)
  292. sr := c.multisearchRequests[0].Requests[0]
  293. firstLevel := sr.Aggs[0]
  294. So(firstLevel.Key, ShouldEqual, "2")
  295. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  296. hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
  297. So(hAgg.Field, ShouldEqual, "@timestamp")
  298. So(hAgg.Interval, ShouldEqual, "$__interval")
  299. So(hAgg.MinDocCount, ShouldEqual, 2)
  300. })
  301. Convey("With histogram agg", func() {
  302. c := newFakeClient(5)
  303. _, err := executeTsdbQuery(c, `{
  304. "timeField": "@timestamp",
  305. "bucketAggs": [
  306. {
  307. "id": "3",
  308. "type": "histogram",
  309. "field": "bytes",
  310. "settings": { "interval": 10, "min_doc_count": 2, "missing": 5 }
  311. }
  312. ],
  313. "metrics": [{"type": "count", "id": "1" }]
  314. }`, from, to, 15*time.Second)
  315. So(err, ShouldBeNil)
  316. sr := c.multisearchRequests[0].Requests[0]
  317. firstLevel := sr.Aggs[0]
  318. So(firstLevel.Key, ShouldEqual, "3")
  319. So(firstLevel.Aggregation.Type, ShouldEqual, "histogram")
  320. hAgg := firstLevel.Aggregation.Aggregation.(*es.HistogramAgg)
  321. So(hAgg.Field, ShouldEqual, "bytes")
  322. So(hAgg.Interval, ShouldEqual, 10)
  323. So(hAgg.MinDocCount, ShouldEqual, 2)
  324. So(*hAgg.Missing, ShouldEqual, 5)
  325. })
  326. Convey("With geo hash grid agg", func() {
  327. c := newFakeClient(5)
  328. _, err := executeTsdbQuery(c, `{
  329. "timeField": "@timestamp",
  330. "bucketAggs": [
  331. {
  332. "id": "3",
  333. "type": "geohash_grid",
  334. "field": "@location",
  335. "settings": { "precision": 3 }
  336. }
  337. ],
  338. "metrics": [{"type": "count", "id": "1" }]
  339. }`, from, to, 15*time.Second)
  340. So(err, ShouldBeNil)
  341. sr := c.multisearchRequests[0].Requests[0]
  342. firstLevel := sr.Aggs[0]
  343. So(firstLevel.Key, ShouldEqual, "3")
  344. So(firstLevel.Aggregation.Type, ShouldEqual, "geohash_grid")
  345. ghGridAgg := firstLevel.Aggregation.Aggregation.(*es.GeoHashGridAggregation)
  346. So(ghGridAgg.Field, ShouldEqual, "@location")
  347. So(ghGridAgg.Precision, ShouldEqual, 3)
  348. })
  349. Convey("With moving average", func() {
  350. c := newFakeClient(5)
  351. _, err := executeTsdbQuery(c, `{
  352. "timeField": "@timestamp",
  353. "bucketAggs": [
  354. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  355. ],
  356. "metrics": [
  357. { "id": "3", "type": "sum", "field": "@value" },
  358. {
  359. "id": "2",
  360. "type": "moving_avg",
  361. "field": "3",
  362. "pipelineAgg": "3"
  363. }
  364. ]
  365. }`, from, to, 15*time.Second)
  366. So(err, ShouldBeNil)
  367. sr := c.multisearchRequests[0].Requests[0]
  368. firstLevel := sr.Aggs[0]
  369. So(firstLevel.Key, ShouldEqual, "4")
  370. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  371. So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 2)
  372. sumAgg := firstLevel.Aggregation.Aggs[0]
  373. So(sumAgg.Key, ShouldEqual, "3")
  374. So(sumAgg.Aggregation.Type, ShouldEqual, "sum")
  375. mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation)
  376. So(mAgg.Field, ShouldEqual, "@value")
  377. movingAvgAgg := firstLevel.Aggregation.Aggs[1]
  378. So(movingAvgAgg.Key, ShouldEqual, "2")
  379. So(movingAvgAgg.Aggregation.Type, ShouldEqual, "moving_avg")
  380. pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  381. So(pl.BucketPath, ShouldEqual, "3")
  382. })
  383. Convey("With moving average doc count", func() {
  384. c := newFakeClient(5)
  385. _, err := executeTsdbQuery(c, `{
  386. "timeField": "@timestamp",
  387. "bucketAggs": [
  388. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  389. ],
  390. "metrics": [
  391. { "id": "3", "type": "count", "field": "select field" },
  392. {
  393. "id": "2",
  394. "type": "moving_avg",
  395. "field": "3",
  396. "pipelineAgg": "3"
  397. }
  398. ]
  399. }`, from, to, 15*time.Second)
  400. So(err, ShouldBeNil)
  401. sr := c.multisearchRequests[0].Requests[0]
  402. firstLevel := sr.Aggs[0]
  403. So(firstLevel.Key, ShouldEqual, "4")
  404. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  405. So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 1)
  406. movingAvgAgg := firstLevel.Aggregation.Aggs[0]
  407. So(movingAvgAgg.Key, ShouldEqual, "2")
  408. So(movingAvgAgg.Aggregation.Type, ShouldEqual, "moving_avg")
  409. pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  410. So(pl.BucketPath, ShouldEqual, "_count")
  411. })
  412. Convey("With broken moving average", func() {
  413. c := newFakeClient(5)
  414. _, err := executeTsdbQuery(c, `{
  415. "timeField": "@timestamp",
  416. "bucketAggs": [
  417. { "type": "date_histogram", "field": "@timestamp", "id": "5" }
  418. ],
  419. "metrics": [
  420. { "id": "3", "type": "sum", "field": "@value" },
  421. {
  422. "id": "2",
  423. "type": "moving_avg",
  424. "pipelineAgg": "3"
  425. },
  426. {
  427. "id": "4",
  428. "type": "moving_avg",
  429. "pipelineAgg": "Metric to apply moving average"
  430. }
  431. ]
  432. }`, from, to, 15*time.Second)
  433. So(err, ShouldBeNil)
  434. sr := c.multisearchRequests[0].Requests[0]
  435. firstLevel := sr.Aggs[0]
  436. So(firstLevel.Key, ShouldEqual, "5")
  437. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  438. So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 2)
  439. movingAvgAgg := firstLevel.Aggregation.Aggs[1]
  440. So(movingAvgAgg.Key, ShouldEqual, "2")
  441. plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  442. So(plAgg.BucketPath, ShouldEqual, "3")
  443. })
  444. Convey("With derivative", func() {
  445. c := newFakeClient(5)
  446. _, err := executeTsdbQuery(c, `{
  447. "timeField": "@timestamp",
  448. "bucketAggs": [
  449. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  450. ],
  451. "metrics": [
  452. { "id": "3", "type": "sum", "field": "@value" },
  453. {
  454. "id": "2",
  455. "type": "derivative",
  456. "pipelineAgg": "3"
  457. }
  458. ]
  459. }`, from, to, 15*time.Second)
  460. So(err, ShouldBeNil)
  461. sr := c.multisearchRequests[0].Requests[0]
  462. firstLevel := sr.Aggs[0]
  463. So(firstLevel.Key, ShouldEqual, "4")
  464. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  465. derivativeAgg := firstLevel.Aggregation.Aggs[1]
  466. So(derivativeAgg.Key, ShouldEqual, "2")
  467. plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  468. So(plAgg.BucketPath, ShouldEqual, "3")
  469. })
  470. Convey("With derivative doc count", func() {
  471. c := newFakeClient(5)
  472. _, err := executeTsdbQuery(c, `{
  473. "timeField": "@timestamp",
  474. "bucketAggs": [
  475. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  476. ],
  477. "metrics": [
  478. { "id": "3", "type": "count", "field": "select field" },
  479. {
  480. "id": "2",
  481. "type": "derivative",
  482. "pipelineAgg": "3"
  483. }
  484. ]
  485. }`, from, to, 15*time.Second)
  486. So(err, ShouldBeNil)
  487. sr := c.multisearchRequests[0].Requests[0]
  488. firstLevel := sr.Aggs[0]
  489. So(firstLevel.Key, ShouldEqual, "4")
  490. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  491. derivativeAgg := firstLevel.Aggregation.Aggs[0]
  492. So(derivativeAgg.Key, ShouldEqual, "2")
  493. plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  494. So(plAgg.BucketPath, ShouldEqual, "_count")
  495. })
  496. Convey("With bucket_script", func() {
  497. c := newFakeClient(5)
  498. _, err := executeTsdbQuery(c, `{
  499. "timeField": "@timestamp",
  500. "bucketAggs": [
  501. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  502. ],
  503. "metrics": [
  504. { "id": "3", "type": "sum", "field": "@value" },
  505. { "id": "5", "type": "max", "field": "@value" },
  506. {
  507. "id": "2",
  508. "type": "bucket_script",
  509. "pipelineVariables": [
  510. { "name": "var1", "pipelineAgg": "3" },
  511. { "name": "var2", "pipelineAgg": "5" }
  512. ],
  513. "settings": { "script": "params.var1 * params.var2" }
  514. }
  515. ]
  516. }`, from, to, 15*time.Second)
  517. So(err, ShouldBeNil)
  518. sr := c.multisearchRequests[0].Requests[0]
  519. firstLevel := sr.Aggs[0]
  520. So(firstLevel.Key, ShouldEqual, "4")
  521. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  522. bucketScriptAgg := firstLevel.Aggregation.Aggs[2]
  523. So(bucketScriptAgg.Key, ShouldEqual, "2")
  524. plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  525. So(plAgg.BucketPath.(map[string]interface{}), ShouldResemble, map[string]interface{}{
  526. "var1": "3",
  527. "var2": "5",
  528. })
  529. })
  530. Convey("With bucket_script doc count", func() {
  531. c := newFakeClient(5)
  532. _, err := executeTsdbQuery(c, `{
  533. "timeField": "@timestamp",
  534. "bucketAggs": [
  535. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  536. ],
  537. "metrics": [
  538. { "id": "3", "type": "count", "field": "select field" },
  539. {
  540. "id": "2",
  541. "type": "bucket_script",
  542. "pipelineVariables": [
  543. { "name": "var1", "pipelineAgg": "3" }
  544. ],
  545. "settings": { "script": "params.var1 * 1000" }
  546. }
  547. ]
  548. }`, from, to, 15*time.Second)
  549. So(err, ShouldBeNil)
  550. sr := c.multisearchRequests[0].Requests[0]
  551. firstLevel := sr.Aggs[0]
  552. So(firstLevel.Key, ShouldEqual, "4")
  553. So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
  554. bucketScriptAgg := firstLevel.Aggregation.Aggs[0]
  555. So(bucketScriptAgg.Key, ShouldEqual, "2")
  556. plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
  557. So(plAgg.BucketPath.(map[string]interface{}), ShouldResemble, map[string]interface{}{
  558. "var1": "_count",
  559. })
  560. })
  561. })
  562. }
  563. type fakeClient struct {
  564. version int
  565. timeField string
  566. multiSearchResponse *es.MultiSearchResponse
  567. multiSearchError error
  568. builder *es.MultiSearchRequestBuilder
  569. multisearchRequests []*es.MultiSearchRequest
  570. }
  571. func newFakeClient(version int) *fakeClient {
  572. return &fakeClient{
  573. version: version,
  574. timeField: "@timestamp",
  575. multisearchRequests: make([]*es.MultiSearchRequest, 0),
  576. multiSearchResponse: &es.MultiSearchResponse{},
  577. }
  578. }
  579. func (c *fakeClient) EnableDebug() {}
  580. func (c *fakeClient) GetVersion() int {
  581. return c.version
  582. }
  583. func (c *fakeClient) GetTimeField() string {
  584. return c.timeField
  585. }
  586. func (c *fakeClient) GetMinInterval(queryInterval string) (time.Duration, error) {
  587. return 15 * time.Second, nil
  588. }
  589. func (c *fakeClient) ExecuteMultisearch(r *es.MultiSearchRequest) (*es.MultiSearchResponse, error) {
  590. c.multisearchRequests = append(c.multisearchRequests, r)
  591. return c.multiSearchResponse, c.multiSearchError
  592. }
  593. func (c *fakeClient) MultiSearch() *es.MultiSearchRequestBuilder {
  594. c.builder = es.NewMultiSearchRequestBuilder(c.version)
  595. return c.builder
  596. }
  597. func newTsdbQuery(body string) (*tsdb.TsdbQuery, error) {
  598. json, err := simplejson.NewJson([]byte(body))
  599. if err != nil {
  600. return nil, err
  601. }
  602. return &tsdb.TsdbQuery{
  603. Queries: []*tsdb.Query{
  604. {
  605. Model: json,
  606. },
  607. },
  608. }, nil
  609. }
  610. func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval time.Duration) (*tsdb.Response, error) {
  611. json, err := simplejson.NewJson([]byte(body))
  612. if err != nil {
  613. return nil, err
  614. }
  615. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  616. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  617. tsdbQuery := &tsdb.TsdbQuery{
  618. Queries: []*tsdb.Query{
  619. {
  620. Model: json,
  621. },
  622. },
  623. TimeRange: tsdb.NewTimeRange(fromStr, toStr),
  624. }
  625. query := newTimeSeriesQuery(c, tsdbQuery, tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: minInterval}))
  626. return query.execute()
  627. }
  628. func TestTimeSeriesQueryParser(t *testing.T) {
  629. Convey("Test time series query parser", t, func() {
  630. p := newTimeSeriesQueryParser()
  631. Convey("Should be able to parse query", func() {
  632. body := `{
  633. "timeField": "@timestamp",
  634. "query": "@metric:cpu",
  635. "alias": "{{@hostname}} {{metric}}",
  636. "metrics": [
  637. {
  638. "field": "@value",
  639. "id": "1",
  640. "meta": {},
  641. "settings": {
  642. "percents": [
  643. "90"
  644. ]
  645. },
  646. "type": "percentiles"
  647. },
  648. {
  649. "type": "count",
  650. "field": "select field",
  651. "id": "4",
  652. "settings": {},
  653. "meta": {}
  654. }
  655. ],
  656. "bucketAggs": [
  657. {
  658. "fake": true,
  659. "field": "@hostname",
  660. "id": "3",
  661. "settings": {
  662. "min_doc_count": 1,
  663. "order": "desc",
  664. "orderBy": "_term",
  665. "size": "10"
  666. },
  667. "type": "terms"
  668. },
  669. {
  670. "field": "@timestamp",
  671. "id": "2",
  672. "settings": {
  673. "interval": "5m",
  674. "min_doc_count": 0,
  675. "trimEdges": 0
  676. },
  677. "type": "date_histogram"
  678. }
  679. ]
  680. }`
  681. tsdbQuery, err := newTsdbQuery(body)
  682. So(err, ShouldBeNil)
  683. queries, err := p.parse(tsdbQuery)
  684. So(err, ShouldBeNil)
  685. So(queries, ShouldHaveLength, 1)
  686. q := queries[0]
  687. So(q.TimeField, ShouldEqual, "@timestamp")
  688. So(q.RawQuery, ShouldEqual, "@metric:cpu")
  689. So(q.Alias, ShouldEqual, "{{@hostname}} {{metric}}")
  690. So(q.Metrics, ShouldHaveLength, 2)
  691. So(q.Metrics[0].Field, ShouldEqual, "@value")
  692. So(q.Metrics[0].ID, ShouldEqual, "1")
  693. So(q.Metrics[0].Type, ShouldEqual, "percentiles")
  694. So(q.Metrics[0].Hide, ShouldBeFalse)
  695. So(q.Metrics[0].PipelineAggregate, ShouldEqual, "")
  696. So(q.Metrics[0].Settings.Get("percents").MustStringArray()[0], ShouldEqual, "90")
  697. So(q.Metrics[1].Field, ShouldEqual, "select field")
  698. So(q.Metrics[1].ID, ShouldEqual, "4")
  699. So(q.Metrics[1].Type, ShouldEqual, "count")
  700. So(q.Metrics[1].Hide, ShouldBeFalse)
  701. So(q.Metrics[1].PipelineAggregate, ShouldEqual, "")
  702. So(q.Metrics[1].Settings.MustMap(), ShouldBeEmpty)
  703. So(q.BucketAggs, ShouldHaveLength, 2)
  704. So(q.BucketAggs[0].Field, ShouldEqual, "@hostname")
  705. So(q.BucketAggs[0].ID, ShouldEqual, "3")
  706. So(q.BucketAggs[0].Type, ShouldEqual, "terms")
  707. So(q.BucketAggs[0].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 1)
  708. So(q.BucketAggs[0].Settings.Get("order").MustString(), ShouldEqual, "desc")
  709. So(q.BucketAggs[0].Settings.Get("orderBy").MustString(), ShouldEqual, "_term")
  710. So(q.BucketAggs[0].Settings.Get("size").MustString(), ShouldEqual, "10")
  711. So(q.BucketAggs[1].Field, ShouldEqual, "@timestamp")
  712. So(q.BucketAggs[1].ID, ShouldEqual, "2")
  713. So(q.BucketAggs[1].Type, ShouldEqual, "date_histogram")
  714. So(q.BucketAggs[1].Settings.Get("interval").MustString(), ShouldEqual, "5m")
  715. So(q.BucketAggs[1].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 0)
  716. So(q.BucketAggs[1].Settings.Get("trimEdges").MustInt64(), ShouldEqual, 0)
  717. })
  718. })
  719. }