response_parser_test.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. package elasticsearch
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "testing"
  6. "time"
  7. "github.com/grafana/grafana/pkg/components/null"
  8. "github.com/grafana/grafana/pkg/components/simplejson"
  9. "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. . "github.com/smartystreets/goconvey/convey"
  12. )
  13. func TestResponseParser(t *testing.T) {
  14. Convey("Elasticsearch response parser test", t, func() {
  15. Convey("Simple query and count", func() {
  16. targets := map[string]string{
  17. "A": `{
  18. "timeField": "@timestamp",
  19. "metrics": [{ "type": "count", "id": "1" }],
  20. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }]
  21. }`,
  22. }
  23. response := `{
  24. "responses": [
  25. {
  26. "aggregations": {
  27. "2": {
  28. "buckets": [
  29. {
  30. "doc_count": 10,
  31. "key": 1000
  32. },
  33. {
  34. "doc_count": 15,
  35. "key": 2000
  36. }
  37. ]
  38. }
  39. }
  40. }
  41. ]
  42. }`
  43. rp, err := newResponseParserForTest(targets, response)
  44. So(err, ShouldBeNil)
  45. result, err := rp.getTimeSeries()
  46. So(err, ShouldBeNil)
  47. So(result.Results, ShouldHaveLength, 1)
  48. queryRes := result.Results["A"]
  49. So(queryRes, ShouldNotBeNil)
  50. So(queryRes.Series, ShouldHaveLength, 1)
  51. series := queryRes.Series[0]
  52. So(series.Name, ShouldEqual, "Count")
  53. So(series.Points, ShouldHaveLength, 2)
  54. So(series.Points[0][0].Float64, ShouldEqual, 10)
  55. So(series.Points[0][1].Float64, ShouldEqual, 1000)
  56. So(series.Points[1][0].Float64, ShouldEqual, 15)
  57. So(series.Points[1][1].Float64, ShouldEqual, 2000)
  58. })
  59. Convey("Simple query count & avg aggregation", func() {
  60. targets := map[string]string{
  61. "A": `{
  62. "timeField": "@timestamp",
  63. "metrics": [{ "type": "count", "id": "1" }, {"type": "avg", "field": "value", "id": "2" }],
  64. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "3" }]
  65. }`,
  66. }
  67. response := `{
  68. "responses": [
  69. {
  70. "aggregations": {
  71. "3": {
  72. "buckets": [
  73. {
  74. "2": { "value": 88 },
  75. "doc_count": 10,
  76. "key": 1000
  77. },
  78. {
  79. "2": { "value": 99 },
  80. "doc_count": 15,
  81. "key": 2000
  82. }
  83. ]
  84. }
  85. }
  86. }
  87. ]
  88. }`
  89. rp, err := newResponseParserForTest(targets, response)
  90. So(err, ShouldBeNil)
  91. result, err := rp.getTimeSeries()
  92. So(err, ShouldBeNil)
  93. So(result.Results, ShouldHaveLength, 1)
  94. queryRes := result.Results["A"]
  95. So(queryRes, ShouldNotBeNil)
  96. So(queryRes.Series, ShouldHaveLength, 2)
  97. seriesOne := queryRes.Series[0]
  98. So(seriesOne.Name, ShouldEqual, "Count")
  99. So(seriesOne.Points, ShouldHaveLength, 2)
  100. So(seriesOne.Points[0][0].Float64, ShouldEqual, 10)
  101. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  102. So(seriesOne.Points[1][0].Float64, ShouldEqual, 15)
  103. So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
  104. seriesTwo := queryRes.Series[1]
  105. So(seriesTwo.Name, ShouldEqual, "Average value")
  106. So(seriesTwo.Points, ShouldHaveLength, 2)
  107. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 88)
  108. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  109. So(seriesTwo.Points[1][0].Float64, ShouldEqual, 99)
  110. So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
  111. })
  112. Convey("Single group by query one metric", func() {
  113. targets := map[string]string{
  114. "A": `{
  115. "timeField": "@timestamp",
  116. "metrics": [{ "type": "count", "id": "1" }],
  117. "bucketAggs": [
  118. { "type": "terms", "field": "host", "id": "2" },
  119. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  120. ]
  121. }`,
  122. }
  123. response := `{
  124. "responses": [
  125. {
  126. "aggregations": {
  127. "2": {
  128. "buckets": [
  129. {
  130. "3": {
  131. "buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }]
  132. },
  133. "doc_count": 4,
  134. "key": "server1"
  135. },
  136. {
  137. "3": {
  138. "buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
  139. },
  140. "doc_count": 10,
  141. "key": "server2"
  142. }
  143. ]
  144. }
  145. }
  146. }
  147. ]
  148. }`
  149. rp, err := newResponseParserForTest(targets, response)
  150. So(err, ShouldBeNil)
  151. result, err := rp.getTimeSeries()
  152. So(err, ShouldBeNil)
  153. So(result.Results, ShouldHaveLength, 1)
  154. queryRes := result.Results["A"]
  155. So(queryRes, ShouldNotBeNil)
  156. So(queryRes.Series, ShouldHaveLength, 2)
  157. seriesOne := queryRes.Series[0]
  158. So(seriesOne.Name, ShouldEqual, "server1")
  159. So(seriesOne.Points, ShouldHaveLength, 2)
  160. So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
  161. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  162. So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
  163. So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
  164. seriesTwo := queryRes.Series[1]
  165. So(seriesTwo.Name, ShouldEqual, "server2")
  166. So(seriesTwo.Points, ShouldHaveLength, 2)
  167. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2)
  168. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  169. So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8)
  170. So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
  171. })
  172. Convey("Single group by query two metrics", func() {
  173. targets := map[string]string{
  174. "A": `{
  175. "timeField": "@timestamp",
  176. "metrics": [{ "type": "count", "id": "1" }, { "type": "avg", "field": "@value", "id": "4" }],
  177. "bucketAggs": [
  178. { "type": "terms", "field": "host", "id": "2" },
  179. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  180. ]
  181. }`,
  182. }
  183. response := `{
  184. "responses": [
  185. {
  186. "aggregations": {
  187. "2": {
  188. "buckets": [
  189. {
  190. "3": {
  191. "buckets": [
  192. { "4": { "value": 10 }, "doc_count": 1, "key": 1000 },
  193. { "4": { "value": 12 }, "doc_count": 3, "key": 2000 }
  194. ]
  195. },
  196. "doc_count": 4,
  197. "key": "server1"
  198. },
  199. {
  200. "3": {
  201. "buckets": [
  202. { "4": { "value": 20 }, "doc_count": 1, "key": 1000 },
  203. { "4": { "value": 32 }, "doc_count": 3, "key": 2000 }
  204. ]
  205. },
  206. "doc_count": 10,
  207. "key": "server2"
  208. }
  209. ]
  210. }
  211. }
  212. }
  213. ]
  214. }`
  215. rp, err := newResponseParserForTest(targets, response)
  216. So(err, ShouldBeNil)
  217. result, err := rp.getTimeSeries()
  218. So(err, ShouldBeNil)
  219. So(result.Results, ShouldHaveLength, 1)
  220. queryRes := result.Results["A"]
  221. So(queryRes, ShouldNotBeNil)
  222. So(queryRes.Series, ShouldHaveLength, 4)
  223. seriesOne := queryRes.Series[0]
  224. So(seriesOne.Name, ShouldEqual, "server1 Count")
  225. So(seriesOne.Points, ShouldHaveLength, 2)
  226. So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
  227. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  228. So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
  229. So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
  230. seriesTwo := queryRes.Series[1]
  231. So(seriesTwo.Name, ShouldEqual, "server1 Average @value")
  232. So(seriesTwo.Points, ShouldHaveLength, 2)
  233. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 10)
  234. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  235. So(seriesTwo.Points[1][0].Float64, ShouldEqual, 12)
  236. So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
  237. seriesThree := queryRes.Series[2]
  238. So(seriesThree.Name, ShouldEqual, "server2 Count")
  239. So(seriesThree.Points, ShouldHaveLength, 2)
  240. So(seriesThree.Points[0][0].Float64, ShouldEqual, 1)
  241. So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
  242. So(seriesThree.Points[1][0].Float64, ShouldEqual, 3)
  243. So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000)
  244. seriesFour := queryRes.Series[3]
  245. So(seriesFour.Name, ShouldEqual, "server2 Average @value")
  246. So(seriesFour.Points, ShouldHaveLength, 2)
  247. So(seriesFour.Points[0][0].Float64, ShouldEqual, 20)
  248. So(seriesFour.Points[0][1].Float64, ShouldEqual, 1000)
  249. So(seriesFour.Points[1][0].Float64, ShouldEqual, 32)
  250. So(seriesFour.Points[1][1].Float64, ShouldEqual, 2000)
  251. })
  252. Convey("With percentiles", func() {
  253. targets := map[string]string{
  254. "A": `{
  255. "timeField": "@timestamp",
  256. "metrics": [{ "type": "percentiles", "settings": { "percents": [75, 90] }, "id": "1" }],
  257. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "3" }]
  258. }`,
  259. }
  260. response := `{
  261. "responses": [
  262. {
  263. "aggregations": {
  264. "3": {
  265. "buckets": [
  266. {
  267. "1": { "values": { "75": 3.3, "90": 5.5 } },
  268. "doc_count": 10,
  269. "key": 1000
  270. },
  271. {
  272. "1": { "values": { "75": 2.3, "90": 4.5 } },
  273. "doc_count": 15,
  274. "key": 2000
  275. }
  276. ]
  277. }
  278. }
  279. }
  280. ]
  281. }`
  282. rp, err := newResponseParserForTest(targets, response)
  283. So(err, ShouldBeNil)
  284. result, err := rp.getTimeSeries()
  285. So(err, ShouldBeNil)
  286. So(result.Results, ShouldHaveLength, 1)
  287. queryRes := result.Results["A"]
  288. So(queryRes, ShouldNotBeNil)
  289. So(queryRes.Series, ShouldHaveLength, 2)
  290. seriesOne := queryRes.Series[0]
  291. So(seriesOne.Name, ShouldEqual, "p75")
  292. So(seriesOne.Points, ShouldHaveLength, 2)
  293. So(seriesOne.Points[0][0].Float64, ShouldEqual, 3.3)
  294. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  295. So(seriesOne.Points[1][0].Float64, ShouldEqual, 2.3)
  296. So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
  297. seriesTwo := queryRes.Series[1]
  298. So(seriesTwo.Name, ShouldEqual, "p90")
  299. So(seriesTwo.Points, ShouldHaveLength, 2)
  300. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 5.5)
  301. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  302. So(seriesTwo.Points[1][0].Float64, ShouldEqual, 4.5)
  303. So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
  304. })
  305. Convey("With extended stats", func() {
  306. targets := map[string]string{
  307. "A": `{
  308. "timeField": "@timestamp",
  309. "metrics": [{ "type": "extended_stats", "meta": { "max": true, "std_deviation_bounds_upper": true, "std_deviation_bounds_lower": true }, "id": "1" }],
  310. "bucketAggs": [
  311. { "type": "terms", "field": "host", "id": "3" },
  312. { "type": "date_histogram", "field": "@timestamp", "id": "4" }
  313. ]
  314. }`,
  315. }
  316. response := `{
  317. "responses": [
  318. {
  319. "aggregations": {
  320. "3": {
  321. "buckets": [
  322. {
  323. "key": "server1",
  324. "4": {
  325. "buckets": [
  326. {
  327. "1": {
  328. "max": 10.2,
  329. "min": 5.5,
  330. "std_deviation_bounds": { "upper": 3, "lower": -2 }
  331. },
  332. "doc_count": 10,
  333. "key": 1000
  334. }
  335. ]
  336. }
  337. },
  338. {
  339. "key": "server2",
  340. "4": {
  341. "buckets": [
  342. {
  343. "1": {
  344. "max": 15.5,
  345. "min": 3.4,
  346. "std_deviation_bounds": { "upper": 4, "lower": -1 }
  347. },
  348. "doc_count": 10,
  349. "key": 1000
  350. }
  351. ]
  352. }
  353. }
  354. ]
  355. }
  356. }
  357. }
  358. ]
  359. }`
  360. rp, err := newResponseParserForTest(targets, response)
  361. So(err, ShouldBeNil)
  362. result, err := rp.getTimeSeries()
  363. So(err, ShouldBeNil)
  364. So(result.Results, ShouldHaveLength, 1)
  365. queryRes := result.Results["A"]
  366. So(queryRes, ShouldNotBeNil)
  367. So(queryRes.Series, ShouldHaveLength, 6)
  368. seriesOne := queryRes.Series[0]
  369. So(seriesOne.Name, ShouldEqual, "server1 Max")
  370. So(seriesOne.Points, ShouldHaveLength, 1)
  371. So(seriesOne.Points[0][0].Float64, ShouldEqual, 10.2)
  372. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  373. seriesTwo := queryRes.Series[1]
  374. So(seriesTwo.Name, ShouldEqual, "server1 Std Dev Lower")
  375. So(seriesTwo.Points, ShouldHaveLength, 1)
  376. So(seriesTwo.Points[0][0].Float64, ShouldEqual, -2)
  377. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  378. seriesThree := queryRes.Series[2]
  379. So(seriesThree.Name, ShouldEqual, "server1 Std Dev Upper")
  380. So(seriesThree.Points, ShouldHaveLength, 1)
  381. So(seriesThree.Points[0][0].Float64, ShouldEqual, 3)
  382. So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
  383. seriesFour := queryRes.Series[3]
  384. So(seriesFour.Name, ShouldEqual, "server2 Max")
  385. So(seriesFour.Points, ShouldHaveLength, 1)
  386. So(seriesFour.Points[0][0].Float64, ShouldEqual, 15.5)
  387. So(seriesFour.Points[0][1].Float64, ShouldEqual, 1000)
  388. seriesFive := queryRes.Series[4]
  389. So(seriesFive.Name, ShouldEqual, "server2 Std Dev Lower")
  390. So(seriesFive.Points, ShouldHaveLength, 1)
  391. So(seriesFive.Points[0][0].Float64, ShouldEqual, -1)
  392. So(seriesFive.Points[0][1].Float64, ShouldEqual, 1000)
  393. seriesSix := queryRes.Series[5]
  394. So(seriesSix.Name, ShouldEqual, "server2 Std Dev Upper")
  395. So(seriesSix.Points, ShouldHaveLength, 1)
  396. So(seriesSix.Points[0][0].Float64, ShouldEqual, 4)
  397. So(seriesSix.Points[0][1].Float64, ShouldEqual, 1000)
  398. })
  399. Convey("Single group by with alias pattern", func() {
  400. targets := map[string]string{
  401. "A": `{
  402. "timeField": "@timestamp",
  403. "alias": "{{term @host}} {{metric}} and {{not_exist}} {{@host}}",
  404. "metrics": [{ "type": "count", "id": "1" }],
  405. "bucketAggs": [
  406. { "type": "terms", "field": "@host", "id": "2" },
  407. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  408. ]
  409. }`,
  410. }
  411. response := `{
  412. "responses": [
  413. {
  414. "aggregations": {
  415. "2": {
  416. "buckets": [
  417. {
  418. "3": {
  419. "buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }]
  420. },
  421. "doc_count": 4,
  422. "key": "server1"
  423. },
  424. {
  425. "3": {
  426. "buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
  427. },
  428. "doc_count": 10,
  429. "key": "server2"
  430. },
  431. {
  432. "3": {
  433. "buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
  434. },
  435. "doc_count": 10,
  436. "key": 0
  437. }
  438. ]
  439. }
  440. }
  441. }
  442. ]
  443. }`
  444. rp, err := newResponseParserForTest(targets, response)
  445. So(err, ShouldBeNil)
  446. result, err := rp.getTimeSeries()
  447. So(err, ShouldBeNil)
  448. So(result.Results, ShouldHaveLength, 1)
  449. queryRes := result.Results["A"]
  450. So(queryRes, ShouldNotBeNil)
  451. So(queryRes.Series, ShouldHaveLength, 3)
  452. seriesOne := queryRes.Series[0]
  453. So(seriesOne.Name, ShouldEqual, "server1 Count and {{not_exist}} server1")
  454. So(seriesOne.Points, ShouldHaveLength, 2)
  455. So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
  456. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  457. So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
  458. So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
  459. seriesTwo := queryRes.Series[1]
  460. So(seriesTwo.Name, ShouldEqual, "server2 Count and {{not_exist}} server2")
  461. So(seriesTwo.Points, ShouldHaveLength, 2)
  462. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2)
  463. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  464. So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8)
  465. So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
  466. seriesThree := queryRes.Series[2]
  467. So(seriesThree.Name, ShouldEqual, "0 Count and {{not_exist}} 0")
  468. So(seriesThree.Points, ShouldHaveLength, 2)
  469. So(seriesThree.Points[0][0].Float64, ShouldEqual, 2)
  470. So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
  471. So(seriesThree.Points[1][0].Float64, ShouldEqual, 8)
  472. So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000)
  473. })
  474. Convey("Histogram response", func() {
  475. targets := map[string]string{
  476. "A": `{
  477. "timeField": "@timestamp",
  478. "metrics": [{ "type": "count", "id": "1" }],
  479. "bucketAggs": [{ "type": "histogram", "field": "bytes", "id": "3" }]
  480. }`,
  481. }
  482. response := `{
  483. "responses": [
  484. {
  485. "aggregations": {
  486. "3": {
  487. "buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }, { "doc_count": 2, "key": 3000 }]
  488. }
  489. }
  490. }
  491. ]
  492. }`
  493. rp, err := newResponseParserForTest(targets, response)
  494. So(err, ShouldBeNil)
  495. result, err := rp.getTimeSeries()
  496. So(err, ShouldBeNil)
  497. So(result.Results, ShouldHaveLength, 1)
  498. queryRes := result.Results["A"]
  499. So(queryRes, ShouldNotBeNil)
  500. So(queryRes.Tables, ShouldHaveLength, 1)
  501. rows := queryRes.Tables[0].Rows
  502. So(rows, ShouldHaveLength, 3)
  503. cols := queryRes.Tables[0].Columns
  504. So(cols, ShouldHaveLength, 2)
  505. So(cols[0].Text, ShouldEqual, "bytes")
  506. So(cols[1].Text, ShouldEqual, "Count")
  507. So(rows[0][0].(null.Float).Float64, ShouldEqual, 1000)
  508. So(rows[0][1].(null.Float).Float64, ShouldEqual, 1)
  509. So(rows[1][0].(null.Float).Float64, ShouldEqual, 2000)
  510. So(rows[1][1].(null.Float).Float64, ShouldEqual, 3)
  511. So(rows[2][0].(null.Float).Float64, ShouldEqual, 3000)
  512. So(rows[2][1].(null.Float).Float64, ShouldEqual, 2)
  513. })
  514. Convey("With two filters agg", func() {
  515. targets := map[string]string{
  516. "A": `{
  517. "timeField": "@timestamp",
  518. "metrics": [{ "type": "count", "id": "1" }],
  519. "bucketAggs": [
  520. {
  521. "type": "filters",
  522. "id": "2",
  523. "settings": {
  524. "filters": [{ "query": "@metric:cpu" }, { "query": "@metric:logins.count" }]
  525. }
  526. },
  527. { "type": "date_histogram", "field": "@timestamp", "id": "3" }
  528. ]
  529. }`,
  530. }
  531. response := `{
  532. "responses": [
  533. {
  534. "aggregations": {
  535. "2": {
  536. "buckets": {
  537. "@metric:cpu": {
  538. "3": {
  539. "buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }]
  540. }
  541. },
  542. "@metric:logins.count": {
  543. "3": {
  544. "buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
  545. }
  546. }
  547. }
  548. }
  549. }
  550. }
  551. ]
  552. }`
  553. rp, err := newResponseParserForTest(targets, response)
  554. So(err, ShouldBeNil)
  555. result, err := rp.getTimeSeries()
  556. So(err, ShouldBeNil)
  557. So(result.Results, ShouldHaveLength, 1)
  558. queryRes := result.Results["A"]
  559. So(queryRes, ShouldNotBeNil)
  560. So(queryRes.Series, ShouldHaveLength, 2)
  561. seriesOne := queryRes.Series[0]
  562. So(seriesOne.Name, ShouldEqual, "@metric:cpu")
  563. So(seriesOne.Points, ShouldHaveLength, 2)
  564. So(seriesOne.Points[0][0].Float64, ShouldEqual, 1)
  565. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  566. So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
  567. So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
  568. seriesTwo := queryRes.Series[1]
  569. So(seriesTwo.Name, ShouldEqual, "@metric:logins.count")
  570. So(seriesTwo.Points, ShouldHaveLength, 2)
  571. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2)
  572. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  573. So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8)
  574. So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
  575. })
  576. Convey("With dropfirst and last aggregation", func() {
  577. targets := map[string]string{
  578. "A": `{
  579. "timeField": "@timestamp",
  580. "metrics": [{ "type": "avg", "id": "1" }, { "type": "count" }],
  581. "bucketAggs": [
  582. {
  583. "type": "date_histogram",
  584. "field": "@timestamp",
  585. "id": "2",
  586. "settings": { "trimEdges": 1 }
  587. }
  588. ]
  589. }`,
  590. }
  591. response := `{
  592. "responses": [
  593. {
  594. "aggregations": {
  595. "2": {
  596. "buckets": [
  597. {
  598. "1": { "value": 1000 },
  599. "key": 1,
  600. "doc_count": 369
  601. },
  602. {
  603. "1": { "value": 2000 },
  604. "key": 2,
  605. "doc_count": 200
  606. },
  607. {
  608. "1": { "value": 2000 },
  609. "key": 3,
  610. "doc_count": 200
  611. }
  612. ]
  613. }
  614. }
  615. }
  616. ]
  617. }`
  618. rp, err := newResponseParserForTest(targets, response)
  619. So(err, ShouldBeNil)
  620. result, err := rp.getTimeSeries()
  621. So(err, ShouldBeNil)
  622. So(result.Results, ShouldHaveLength, 1)
  623. queryRes := result.Results["A"]
  624. So(queryRes, ShouldNotBeNil)
  625. So(queryRes.Series, ShouldHaveLength, 2)
  626. seriesOne := queryRes.Series[0]
  627. So(seriesOne.Name, ShouldEqual, "Average")
  628. So(seriesOne.Points, ShouldHaveLength, 1)
  629. So(seriesOne.Points[0][0].Float64, ShouldEqual, 2000)
  630. So(seriesOne.Points[0][1].Float64, ShouldEqual, 2)
  631. seriesTwo := queryRes.Series[1]
  632. So(seriesTwo.Name, ShouldEqual, "Count")
  633. So(seriesTwo.Points, ShouldHaveLength, 1)
  634. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 200)
  635. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 2)
  636. })
  637. Convey("No group by time", func() {
  638. targets := map[string]string{
  639. "A": `{
  640. "timeField": "@timestamp",
  641. "metrics": [{ "type": "avg", "id": "1" }, { "type": "count" }],
  642. "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }]
  643. }`,
  644. }
  645. response := `{
  646. "responses": [
  647. {
  648. "aggregations": {
  649. "2": {
  650. "buckets": [
  651. {
  652. "1": { "value": 1000 },
  653. "key": "server-1",
  654. "doc_count": 369
  655. },
  656. {
  657. "1": { "value": 2000 },
  658. "key": "server-2",
  659. "doc_count": 200
  660. }
  661. ]
  662. }
  663. }
  664. }
  665. ]
  666. }`
  667. rp, err := newResponseParserForTest(targets, response)
  668. So(err, ShouldBeNil)
  669. result, err := rp.getTimeSeries()
  670. So(err, ShouldBeNil)
  671. So(result.Results, ShouldHaveLength, 1)
  672. queryRes := result.Results["A"]
  673. So(queryRes, ShouldNotBeNil)
  674. So(queryRes.Tables, ShouldHaveLength, 1)
  675. rows := queryRes.Tables[0].Rows
  676. So(rows, ShouldHaveLength, 2)
  677. cols := queryRes.Tables[0].Columns
  678. So(cols, ShouldHaveLength, 3)
  679. So(cols[0].Text, ShouldEqual, "host")
  680. So(cols[1].Text, ShouldEqual, "Average")
  681. So(cols[2].Text, ShouldEqual, "Count")
  682. So(rows[0][0].(string), ShouldEqual, "server-1")
  683. So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000)
  684. So(rows[0][2].(null.Float).Float64, ShouldEqual, 369)
  685. So(rows[1][0].(string), ShouldEqual, "server-2")
  686. So(rows[1][1].(null.Float).Float64, ShouldEqual, 2000)
  687. So(rows[1][2].(null.Float).Float64, ShouldEqual, 200)
  688. })
  689. Convey("Multiple metrics of same type", func() {
  690. targets := map[string]string{
  691. "A": `{
  692. "timeField": "@timestamp",
  693. "metrics": [{ "type": "avg", "field": "test", "id": "1" }, { "type": "avg", "field": "test2", "id": "2" }],
  694. "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }]
  695. }`,
  696. }
  697. response := `{
  698. "responses": [
  699. {
  700. "aggregations": {
  701. "2": {
  702. "buckets": [
  703. {
  704. "1": { "value": 1000 },
  705. "2": { "value": 3000 },
  706. "key": "server-1",
  707. "doc_count": 369
  708. }
  709. ]
  710. }
  711. }
  712. }
  713. ]
  714. }`
  715. rp, err := newResponseParserForTest(targets, response)
  716. So(err, ShouldBeNil)
  717. result, err := rp.getTimeSeries()
  718. So(err, ShouldBeNil)
  719. So(result.Results, ShouldHaveLength, 1)
  720. queryRes := result.Results["A"]
  721. So(queryRes, ShouldNotBeNil)
  722. So(queryRes.Tables, ShouldHaveLength, 1)
  723. rows := queryRes.Tables[0].Rows
  724. So(rows, ShouldHaveLength, 1)
  725. cols := queryRes.Tables[0].Columns
  726. So(cols, ShouldHaveLength, 3)
  727. So(cols[0].Text, ShouldEqual, "host")
  728. So(cols[1].Text, ShouldEqual, "Average test")
  729. So(cols[2].Text, ShouldEqual, "Average test2")
  730. So(rows[0][0].(string), ShouldEqual, "server-1")
  731. So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000)
  732. So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000)
  733. })
  734. Convey("With bucket_script", func() {
  735. targets := map[string]string{
  736. "A": `{
  737. "timeField": "@timestamp",
  738. "metrics": [
  739. { "id": "1", "type": "sum", "field": "@value" },
  740. { "id": "3", "type": "max", "field": "@value" },
  741. {
  742. "id": "4",
  743. "field": "select field",
  744. "pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }],
  745. "settings": { "script": "params.var1 * params.var2" },
  746. "type": "bucket_script"
  747. }
  748. ],
  749. "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }]
  750. }`,
  751. }
  752. response := `{
  753. "responses": [
  754. {
  755. "aggregations": {
  756. "2": {
  757. "buckets": [
  758. {
  759. "1": { "value": 2 },
  760. "3": { "value": 3 },
  761. "4": { "value": 6 },
  762. "doc_count": 60,
  763. "key": 1000
  764. },
  765. {
  766. "1": { "value": 3 },
  767. "3": { "value": 4 },
  768. "4": { "value": 12 },
  769. "doc_count": 60,
  770. "key": 2000
  771. }
  772. ]
  773. }
  774. }
  775. }
  776. ]
  777. }`
  778. rp, err := newResponseParserForTest(targets, response)
  779. So(err, ShouldBeNil)
  780. result, err := rp.getTimeSeries()
  781. So(err, ShouldBeNil)
  782. So(result.Results, ShouldHaveLength, 1)
  783. queryRes := result.Results["A"]
  784. So(queryRes, ShouldNotBeNil)
  785. So(queryRes.Series, ShouldHaveLength, 3)
  786. seriesOne := queryRes.Series[0]
  787. So(seriesOne.Name, ShouldEqual, "Sum @value")
  788. So(seriesOne.Points, ShouldHaveLength, 2)
  789. So(seriesOne.Points[0][0].Float64, ShouldEqual, 2)
  790. So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
  791. So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
  792. So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
  793. seriesTwo := queryRes.Series[1]
  794. So(seriesTwo.Name, ShouldEqual, "Max @value")
  795. So(seriesTwo.Points, ShouldHaveLength, 2)
  796. So(seriesTwo.Points[0][0].Float64, ShouldEqual, 3)
  797. So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
  798. So(seriesTwo.Points[1][0].Float64, ShouldEqual, 4)
  799. So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
  800. seriesThree := queryRes.Series[2]
  801. So(seriesThree.Name, ShouldEqual, "Sum @value * Max @value")
  802. So(seriesThree.Points, ShouldHaveLength, 2)
  803. So(seriesThree.Points[0][0].Float64, ShouldEqual, 6)
  804. So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
  805. So(seriesThree.Points[1][0].Float64, ShouldEqual, 12)
  806. So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000)
  807. })
  808. // Convey("Raw documents query", func() {
  809. // targets := map[string]string{
  810. // "A": `{
  811. // "timeField": "@timestamp",
  812. // "metrics": [{ "type": "raw_document", "id": "1" }]
  813. // }`,
  814. // }
  815. // response := `{
  816. // "responses": [
  817. // {
  818. // "hits": {
  819. // "total": 100,
  820. // "hits": [
  821. // {
  822. // "_id": "1",
  823. // "_type": "type",
  824. // "_index": "index",
  825. // "_source": { "sourceProp": "asd" },
  826. // "fields": { "fieldProp": "field" }
  827. // },
  828. // {
  829. // "_source": { "sourceProp": "asd2" },
  830. // "fields": { "fieldProp": "field2" }
  831. // }
  832. // ]
  833. // }
  834. // }
  835. // ]
  836. // }`
  837. // rp, err := newResponseParserForTest(targets, response)
  838. // So(err, ShouldBeNil)
  839. // result, err := rp.getTimeSeries()
  840. // So(err, ShouldBeNil)
  841. // So(result.Results, ShouldHaveLength, 1)
  842. // queryRes := result.Results["A"]
  843. // So(queryRes, ShouldNotBeNil)
  844. // So(queryRes.Tables, ShouldHaveLength, 1)
  845. // rows := queryRes.Tables[0].Rows
  846. // So(rows, ShouldHaveLength, 1)
  847. // cols := queryRes.Tables[0].Columns
  848. // So(cols, ShouldHaveLength, 3)
  849. // So(cols[0].Text, ShouldEqual, "host")
  850. // So(cols[1].Text, ShouldEqual, "Average test")
  851. // So(cols[2].Text, ShouldEqual, "Average test2")
  852. // So(rows[0][0].(string), ShouldEqual, "server-1")
  853. // So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000)
  854. // So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000)
  855. // })
  856. })
  857. }
  858. func newResponseParserForTest(tsdbQueries map[string]string, responseBody string) (*responseParser, error) {
  859. from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
  860. to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
  861. fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
  862. toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
  863. tsdbQuery := &tsdb.TsdbQuery{
  864. Queries: []*tsdb.Query{},
  865. TimeRange: tsdb.NewTimeRange(fromStr, toStr),
  866. }
  867. for refID, tsdbQueryBody := range tsdbQueries {
  868. tsdbQueryJSON, err := simplejson.NewJson([]byte(tsdbQueryBody))
  869. if err != nil {
  870. return nil, err
  871. }
  872. tsdbQuery.Queries = append(tsdbQuery.Queries, &tsdb.Query{
  873. Model: tsdbQueryJSON,
  874. RefId: refID,
  875. })
  876. }
  877. var response es.MultiSearchResponse
  878. err := json.Unmarshal([]byte(responseBody), &response)
  879. if err != nil {
  880. return nil, err
  881. }
  882. tsQueryParser := newTimeSeriesQueryParser()
  883. queries, err := tsQueryParser.parse(tsdbQuery)
  884. if err != nil {
  885. return nil, err
  886. }
  887. return newResponseParser(response.Responses, queries), nil
  888. }