integration_test.go 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. // +build integration
  6. package amqp
  7. import (
  8. "bytes"
  9. devrand "crypto/rand"
  10. "encoding/binary"
  11. "fmt"
  12. "hash/crc32"
  13. "io"
  14. "math/rand"
  15. "net"
  16. "os"
  17. "reflect"
  18. "strconv"
  19. "testing"
  20. "testing/quick"
  21. "time"
  22. )
  23. func TestIntegrationOpenClose(t *testing.T) {
  24. if c := integrationConnection(t, "open-close"); c != nil {
  25. t.Logf("have connection, calling connection close")
  26. if err := c.Close(); err != nil {
  27. t.Fatalf("connection close: %s", err)
  28. }
  29. t.Logf("connection close OK")
  30. }
  31. }
  32. func TestIntegrationOpenCloseChannel(t *testing.T) {
  33. if c := integrationConnection(t, "channel"); c != nil {
  34. defer c.Close()
  35. if _, err := c.Channel(); err != nil {
  36. t.Errorf("Channel could not be opened: %s", err)
  37. }
  38. }
  39. }
  40. func TestIntegrationOpenConfig(t *testing.T) {
  41. config := Config{}
  42. c, err := DialConfig(integrationURLFromEnv(), config)
  43. if err != nil {
  44. t.Errorf("expected to dial with config %+v integration server: %s", config, err)
  45. }
  46. if _, err := c.Channel(); err != nil {
  47. t.Fatalf("expected to open channel: %s", err)
  48. }
  49. if err := c.Close(); err != nil {
  50. t.Fatalf("connection close: %s", err)
  51. }
  52. }
  53. func TestIntegrationOpenConfigWithNetDial(t *testing.T) {
  54. config := Config{Dial: net.Dial}
  55. c, err := DialConfig(integrationURLFromEnv(), config)
  56. if err != nil {
  57. t.Errorf("expected to dial with config %+v integration server: %s", config, err)
  58. }
  59. if _, err := c.Channel(); err != nil {
  60. t.Fatalf("expected to open channel: %s", err)
  61. }
  62. if err := c.Close(); err != nil {
  63. t.Fatalf("connection close: %s", err)
  64. }
  65. }
  66. func TestIntegrationLocalAddr(t *testing.T) {
  67. config := Config{}
  68. c, err := DialConfig(integrationURLFromEnv(), config)
  69. defer c.Close()
  70. if err != nil {
  71. t.Errorf("expected to dial with config %+v integration server: %s", config, err)
  72. }
  73. a := c.LocalAddr()
  74. _, portString, err := net.SplitHostPort(a.String())
  75. if err != nil {
  76. t.Errorf("expected to get a local network address with config %+v integration server: %s", config, a.String())
  77. }
  78. port, err := strconv.Atoi(portString)
  79. if err != nil {
  80. t.Errorf("expected to get a TCP port number with config %+v integration server: %s", config, err)
  81. }
  82. t.Logf("Connected to port %d\n", port)
  83. }
  84. // https://github.com/streadway/amqp/issues/94
  85. func TestExchangePassiveOnMissingExchangeShouldError(t *testing.T) {
  86. c := integrationConnection(t, "exch")
  87. if c != nil {
  88. defer c.Close()
  89. ch, err := c.Channel()
  90. if err != nil {
  91. t.Fatalf("create channel 1: %s", err)
  92. }
  93. defer ch.Close()
  94. if err := ch.ExchangeDeclarePassive(
  95. "test-integration-missing-passive-exchange",
  96. "direct", // type
  97. false, // duration (note: is durable)
  98. true, // auto-delete
  99. false, // internal
  100. false, // nowait
  101. nil, // args
  102. ); err == nil {
  103. t.Fatal("ExchangeDeclarePassive of a missing exchange should return error")
  104. }
  105. }
  106. }
  107. // https://github.com/streadway/amqp/issues/94
  108. func TestIntegrationExchangeDeclarePassiveOnDeclaredShouldNotError(t *testing.T) {
  109. c := integrationConnection(t, "exch")
  110. if c != nil {
  111. defer c.Close()
  112. exchange := "test-integration-decalred-passive-exchange"
  113. ch, err := c.Channel()
  114. if err != nil {
  115. t.Fatalf("create channel: %s", err)
  116. }
  117. defer ch.Close()
  118. if err := ch.ExchangeDeclare(
  119. exchange, // name
  120. "direct", // type
  121. false, // durable
  122. true, // auto-delete
  123. false, // internal
  124. false, // nowait
  125. nil, // args
  126. ); err != nil {
  127. t.Fatalf("declare exchange: %s", err)
  128. }
  129. if err := ch.ExchangeDeclarePassive(
  130. exchange, // name
  131. "direct", // type
  132. false, // durable
  133. true, // auto-delete
  134. false, // internal
  135. false, // nowait
  136. nil, // args
  137. ); err != nil {
  138. t.Fatalf("ExchangeDeclarePassive on a declared exchange should not error, got: %q", err)
  139. }
  140. }
  141. }
  142. func TestIntegrationExchange(t *testing.T) {
  143. c := integrationConnection(t, "exch")
  144. if c != nil {
  145. defer c.Close()
  146. channel, err := c.Channel()
  147. if err != nil {
  148. t.Fatalf("create channel: %s", err)
  149. }
  150. t.Logf("create channel OK")
  151. exchange := "test-integration-exchange"
  152. if err := channel.ExchangeDeclare(
  153. exchange, // name
  154. "direct", // type
  155. false, // duration
  156. true, // auto-delete
  157. false, // internal
  158. false, // nowait
  159. nil, // args
  160. ); err != nil {
  161. t.Fatalf("declare exchange: %s", err)
  162. }
  163. t.Logf("declare exchange OK")
  164. if err := channel.ExchangeDelete(exchange, false, false); err != nil {
  165. t.Fatalf("delete exchange: %s", err)
  166. }
  167. t.Logf("delete exchange OK")
  168. if err := channel.Close(); err != nil {
  169. t.Fatalf("close channel: %s", err)
  170. }
  171. t.Logf("close channel OK")
  172. }
  173. }
  174. // https://github.com/streadway/amqp/issues/94
  175. func TestIntegrationQueueDeclarePassiveOnMissingExchangeShouldError(t *testing.T) {
  176. c := integrationConnection(t, "queue")
  177. if c != nil {
  178. defer c.Close()
  179. ch, err := c.Channel()
  180. if err != nil {
  181. t.Fatalf("create channel1: %s", err)
  182. }
  183. defer ch.Close()
  184. if _, err := ch.QueueDeclarePassive(
  185. "test-integration-missing-passive-queue", // name
  186. false, // duration (note: not durable)
  187. true, // auto-delete
  188. false, // exclusive
  189. false, // noWait
  190. nil, // arguments
  191. ); err == nil {
  192. t.Fatal("QueueDeclarePassive of a missing queue should error")
  193. }
  194. }
  195. }
  196. // https://github.com/streadway/amqp/issues/94
  197. func TestIntegrationPassiveQueue(t *testing.T) {
  198. c := integrationConnection(t, "queue")
  199. if c != nil {
  200. defer c.Close()
  201. name := "test-integration-declared-passive-queue"
  202. ch, err := c.Channel()
  203. if err != nil {
  204. t.Fatalf("create channel1: %s", err)
  205. }
  206. defer ch.Close()
  207. if _, err := ch.QueueDeclare(
  208. name, // name
  209. false, // durable
  210. true, // auto-delete
  211. false, // exclusive
  212. false, // noWait
  213. nil, // arguments
  214. ); err != nil {
  215. t.Fatalf("queue declare: %s", err)
  216. }
  217. if _, err := ch.QueueDeclarePassive(
  218. name, // name
  219. false, // durable
  220. true, // auto-delete
  221. false, // exclusive
  222. false, // noWait
  223. nil, // arguments
  224. ); err != nil {
  225. t.Fatalf("QueueDeclarePassive on declared queue should not error, got: %q", err)
  226. }
  227. if _, err := ch.QueueDeclarePassive(
  228. name, // name
  229. true, // durable (note: differs)
  230. true, // auto-delete
  231. false, // exclusive
  232. false, // noWait
  233. nil, // arguments
  234. ); err != nil {
  235. t.Fatalf("QueueDeclarePassive on declared queue with different flags should error")
  236. }
  237. }
  238. }
  239. func TestIntegrationBasicQueueOperations(t *testing.T) {
  240. c := integrationConnection(t, "queue")
  241. if c != nil {
  242. defer c.Close()
  243. channel, err := c.Channel()
  244. if err != nil {
  245. t.Fatalf("create channel: %s", err)
  246. }
  247. t.Logf("create channel OK")
  248. exchangeName := "test-basic-ops-exchange"
  249. queueName := "test-basic-ops-queue"
  250. deleteQueueFirstOptions := []bool{true, false}
  251. for _, deleteQueueFirst := range deleteQueueFirstOptions {
  252. if err := channel.ExchangeDeclare(
  253. exchangeName, // name
  254. "direct", // type
  255. true, // duration (note: is durable)
  256. false, // auto-delete
  257. false, // internal
  258. false, // nowait
  259. nil, // args
  260. ); err != nil {
  261. t.Fatalf("declare exchange: %s", err)
  262. }
  263. t.Logf("declare exchange OK")
  264. if _, err := channel.QueueDeclare(
  265. queueName, // name
  266. true, // duration (note: durable)
  267. false, // auto-delete
  268. false, // exclusive
  269. false, // noWait
  270. nil, // arguments
  271. ); err != nil {
  272. t.Fatalf("queue declare: %s", err)
  273. }
  274. t.Logf("declare queue OK")
  275. if err := channel.QueueBind(
  276. queueName, // name
  277. "", // routingKey
  278. exchangeName, // sourceExchange
  279. false, // noWait
  280. nil, // arguments
  281. ); err != nil {
  282. t.Fatalf("queue bind: %s", err)
  283. }
  284. t.Logf("queue bind OK")
  285. if deleteQueueFirst {
  286. if _, err := channel.QueueDelete(
  287. queueName, // name
  288. false, // ifUnused (false=be aggressive)
  289. false, // ifEmpty (false=be aggressive)
  290. false, // noWait
  291. ); err != nil {
  292. t.Fatalf("delete queue (first): %s", err)
  293. }
  294. t.Logf("delete queue (first) OK")
  295. if err := channel.ExchangeDelete(exchangeName, false, false); err != nil {
  296. t.Fatalf("delete exchange (after delete queue): %s", err)
  297. }
  298. t.Logf("delete exchange (after delete queue) OK")
  299. } else { // deleteExchangeFirst
  300. if err := channel.ExchangeDelete(exchangeName, false, false); err != nil {
  301. t.Fatalf("delete exchange (first): %s", err)
  302. }
  303. t.Logf("delete exchange (first) OK")
  304. if _, err := channel.QueueInspect(queueName); err != nil {
  305. t.Fatalf("inspect queue state after deleting exchange: %s", err)
  306. }
  307. t.Logf("queue properly remains after exchange is deleted")
  308. if _, err := channel.QueueDelete(
  309. queueName,
  310. false, // ifUnused
  311. false, // ifEmpty
  312. false, // noWait
  313. ); err != nil {
  314. t.Fatalf("delete queue (after delete exchange): %s", err)
  315. }
  316. t.Logf("delete queue (after delete exchange) OK")
  317. }
  318. }
  319. if err := channel.Close(); err != nil {
  320. t.Fatalf("close channel: %s", err)
  321. }
  322. t.Logf("close channel OK")
  323. }
  324. }
  325. func TestIntegrationConnectionNegotiatesMaxChannels(t *testing.T) {
  326. config := Config{ChannelMax: 0}
  327. c, err := DialConfig(integrationURLFromEnv(), config)
  328. if err != nil {
  329. t.Errorf("expected to dial with config %+v integration server: %s", config, err)
  330. }
  331. defer c.Close()
  332. if want, got := defaultChannelMax, c.Config.ChannelMax; want != got {
  333. t.Fatalf("expected connection to negotiate uint16 (%d) channels, got: %d", want, got)
  334. }
  335. }
  336. func TestIntegrationConnectionNegotiatesClientMaxChannels(t *testing.T) {
  337. config := Config{ChannelMax: 16}
  338. c, err := DialConfig(integrationURLFromEnv(), config)
  339. if err != nil {
  340. t.Errorf("expected to dial with config %+v integration server: %s", config, err)
  341. }
  342. defer c.Close()
  343. if want, got := config.ChannelMax, c.Config.ChannelMax; want != got {
  344. t.Fatalf("expected client specified channel limit after handshake %d, got: %d", want, got)
  345. }
  346. }
  347. func TestIntegrationChannelIDsExhausted(t *testing.T) {
  348. config := Config{ChannelMax: 16}
  349. c, err := DialConfig(integrationURLFromEnv(), config)
  350. if err != nil {
  351. t.Errorf("expected to dial with config %+v integration server: %s", config, err)
  352. }
  353. defer c.Close()
  354. for i := 1; i <= c.Config.ChannelMax; i++ {
  355. if _, err := c.Channel(); err != nil {
  356. t.Fatalf("expected allocating all channel ids to succed, failed on %d with %v", i, err)
  357. }
  358. }
  359. if _, err := c.Channel(); err != ErrChannelMax {
  360. t.Fatalf("expected allocating all channels to produce the client side error %#v, got: %#v", ErrChannelMax, err)
  361. }
  362. }
  363. func TestIntegrationChannelClosing(t *testing.T) {
  364. c := integrationConnection(t, "closings")
  365. if c != nil {
  366. defer c.Close()
  367. // This function is run on every channel after it is successfully
  368. // opened. It can do something to verify something. It should be
  369. // quick; many channels may be opened!
  370. f := func(t *testing.T, c *Channel) {
  371. return
  372. }
  373. // open and close
  374. channel, err := c.Channel()
  375. if err != nil {
  376. t.Fatalf("basic create channel: %s", err)
  377. }
  378. t.Logf("basic create channel OK")
  379. if err := channel.Close(); err != nil {
  380. t.Fatalf("basic close channel: %s", err)
  381. }
  382. t.Logf("basic close channel OK")
  383. // deferred close
  384. signal := make(chan bool)
  385. go func() {
  386. channel, err := c.Channel()
  387. if err != nil {
  388. t.Fatalf("second create channel: %s", err)
  389. }
  390. t.Logf("second create channel OK")
  391. <-signal // a bit of synchronization
  392. f(t, channel)
  393. defer func() {
  394. if err := channel.Close(); err != nil {
  395. t.Fatalf("deferred close channel: %s", err)
  396. }
  397. t.Logf("deferred close channel OK")
  398. signal <- true
  399. }()
  400. }()
  401. signal <- true
  402. select {
  403. case <-signal:
  404. t.Logf("(got close signal OK)")
  405. break
  406. case <-time.After(250 * time.Millisecond):
  407. t.Fatalf("deferred close: timeout")
  408. }
  409. // multiple channels
  410. for _, n := range []int{2, 4, 8, 16, 32, 64, 128, 256} {
  411. channels := make([]*Channel, n)
  412. for i := 0; i < n; i++ {
  413. var err error
  414. if channels[i], err = c.Channel(); err != nil {
  415. t.Fatalf("create channel %d/%d: %s", i+1, n, err)
  416. }
  417. }
  418. f(t, channel)
  419. for i, channel := range channels {
  420. if err := channel.Close(); err != nil {
  421. t.Fatalf("close channel %d/%d: %s", i+1, n, err)
  422. }
  423. }
  424. t.Logf("created/closed %d channels OK", n)
  425. }
  426. }
  427. }
  428. func TestIntegrationMeaningfulChannelErrors(t *testing.T) {
  429. c := integrationConnection(t, "pub")
  430. if c != nil {
  431. defer c.Close()
  432. ch, err := c.Channel()
  433. if err != nil {
  434. t.Fatalf("Could not create channel")
  435. }
  436. queue := "test.integration.channel.error"
  437. _, err = ch.QueueDeclare(queue, false, true, false, false, nil)
  438. if err != nil {
  439. t.Fatalf("Could not declare")
  440. }
  441. _, err = ch.QueueDeclare(queue, true, false, false, false, nil)
  442. if err == nil {
  443. t.Fatalf("Expected error, got nil")
  444. }
  445. e, ok := err.(*Error)
  446. if !ok {
  447. t.Fatalf("Expected type Error response, got %T", err)
  448. }
  449. if e.Code != PreconditionFailed {
  450. t.Fatalf("Expected PreconditionFailed, got: %+v", e)
  451. }
  452. _, err = ch.QueueDeclare(queue, false, true, false, false, nil)
  453. if err != ErrClosed {
  454. t.Fatalf("Expected channel to be closed, got: %T", err)
  455. }
  456. }
  457. }
  458. // https://github.com/streadway/amqp/issues/6
  459. func TestIntegrationNonBlockingClose(t *testing.T) {
  460. c := integrationConnection(t, "#6")
  461. if c != nil {
  462. defer c.Close()
  463. ch, err := c.Channel()
  464. if err != nil {
  465. t.Fatalf("Could not create channel")
  466. }
  467. queue := "test.integration.blocking.close"
  468. _, err = ch.QueueDeclare(queue, false, true, false, false, nil)
  469. if err != nil {
  470. t.Fatalf("Could not declare")
  471. }
  472. msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
  473. if err != nil {
  474. t.Fatalf("Could not consume")
  475. }
  476. // Simulate a consumer
  477. go func() {
  478. for _ = range msgs {
  479. t.Logf("Oh my, received message on an empty queue")
  480. }
  481. }()
  482. succeed := make(chan bool)
  483. go func() {
  484. if err = ch.Close(); err != nil {
  485. t.Fatalf("Close produced an error when it shouldn't")
  486. }
  487. succeed <- true
  488. }()
  489. select {
  490. case <-succeed:
  491. break
  492. case <-time.After(1 * time.Second):
  493. t.Fatalf("Close timed out after 1s")
  494. }
  495. }
  496. }
  497. func TestIntegrationPublishConsume(t *testing.T) {
  498. queue := "test.integration.publish.consume"
  499. c1 := integrationConnection(t, "pub")
  500. c2 := integrationConnection(t, "sub")
  501. if c1 != nil && c2 != nil {
  502. defer c1.Close()
  503. defer c2.Close()
  504. pub, _ := c1.Channel()
  505. sub, _ := c2.Channel()
  506. pub.QueueDeclare(queue, false, true, false, false, nil)
  507. sub.QueueDeclare(queue, false, true, false, false, nil)
  508. defer pub.QueueDelete(queue, false, false, false)
  509. messages, _ := sub.Consume(queue, "", false, false, false, false, nil)
  510. pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")})
  511. pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")})
  512. pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 3")})
  513. assertConsumeBody(t, messages, []byte("pub 1"))
  514. assertConsumeBody(t, messages, []byte("pub 2"))
  515. assertConsumeBody(t, messages, []byte("pub 3"))
  516. }
  517. }
  518. func TestIntegrationConsumeFlow(t *testing.T) {
  519. queue := "test.integration.consumer-flow"
  520. c1 := integrationConnection(t, "pub-flow")
  521. c2 := integrationConnection(t, "sub-flow")
  522. if c1 != nil && c2 != nil {
  523. defer c1.Close()
  524. defer c2.Close()
  525. pub, _ := c1.Channel()
  526. sub, _ := c2.Channel()
  527. pub.QueueDeclare(queue, false, true, false, false, nil)
  528. sub.QueueDeclare(queue, false, true, false, false, nil)
  529. defer pub.QueueDelete(queue, false, false, false)
  530. sub.Qos(1, 0, false)
  531. messages, _ := sub.Consume(queue, "", false, false, false, false, nil)
  532. pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")})
  533. pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")})
  534. msg := assertConsumeBody(t, messages, []byte("pub 1"))
  535. if err := sub.Flow(false); err.(*Error).Code == NotImplemented {
  536. t.Log("flow control is not supported on this version of rabbitmq")
  537. return
  538. }
  539. msg.Ack(false)
  540. select {
  541. case <-messages:
  542. t.Fatalf("message was delivered when flow was not active")
  543. default:
  544. }
  545. sub.Flow(true)
  546. msg = assertConsumeBody(t, messages, []byte("pub 2"))
  547. msg.Ack(false)
  548. }
  549. }
  550. func TestIntegrationRecoverNotImplemented(t *testing.T) {
  551. queue := "test.recover"
  552. if c, ch := integrationQueue(t, queue); c != nil {
  553. if product, ok := c.Properties["product"]; ok && product.(string) == "RabbitMQ" {
  554. defer c.Close()
  555. err := ch.Recover(false)
  556. if ex, ok := err.(*Error); !ok || ex.Code != 540 {
  557. t.Fatalf("Expected NOT IMPLEMENTED got: %v", ex)
  558. }
  559. }
  560. }
  561. }
  562. // This test is driven by a private API to simulate the server sending a channelFlow message
  563. func TestIntegrationPublishFlow(t *testing.T) {
  564. // TODO - no idea how to test without affecting the server or mucking internal APIs
  565. // i'd like to make sure the RW lock can be held by multiple publisher threads
  566. // and that multiple channelFlow messages do not block the dispatch thread
  567. }
  568. func TestIntegrationConsumeCancel(t *testing.T) {
  569. queue := "test.integration.consume-cancel"
  570. c := integrationConnection(t, "pub")
  571. if c != nil {
  572. defer c.Close()
  573. ch, _ := c.Channel()
  574. ch.QueueDeclare(queue, false, true, false, false, nil)
  575. defer ch.QueueDelete(queue, false, false, false)
  576. messages, _ := ch.Consume(queue, "integration-tag", false, false, false, false, nil)
  577. ch.Publish("", queue, false, false, Publishing{Body: []byte("1")})
  578. assertConsumeBody(t, messages, []byte("1"))
  579. err := ch.Cancel("integration-tag", false)
  580. if err != nil {
  581. t.Fatalf("error cancelling the consumer: %v", err)
  582. }
  583. ch.Publish("", queue, false, false, Publishing{Body: []byte("2")})
  584. select {
  585. case <-time.After(100 * time.Millisecond):
  586. t.Fatalf("Timeout on Close")
  587. case _, ok := <-messages:
  588. if ok {
  589. t.Fatalf("Extra message on consumer when consumer should have been closed")
  590. }
  591. }
  592. }
  593. }
  594. func (c *Connection) Generate(r *rand.Rand, _ int) reflect.Value {
  595. urlStr := os.Getenv("AMQP_URL")
  596. if urlStr == "" {
  597. return reflect.ValueOf(nil)
  598. }
  599. conn, err := Dial(urlStr)
  600. if err != nil {
  601. return reflect.ValueOf(nil)
  602. }
  603. return reflect.ValueOf(conn)
  604. }
  605. func (c Publishing) Generate(r *rand.Rand, _ int) reflect.Value {
  606. var ok bool
  607. var t reflect.Value
  608. p := Publishing{}
  609. //p.DeliveryMode = uint8(r.Intn(3))
  610. //p.Priority = uint8(r.Intn(8))
  611. if r.Intn(2) > 0 {
  612. p.ContentType = "application/octet-stream"
  613. }
  614. if r.Intn(2) > 0 {
  615. p.ContentEncoding = "gzip"
  616. }
  617. if r.Intn(2) > 0 {
  618. p.CorrelationId = fmt.Sprintf("%d", r.Int())
  619. }
  620. if r.Intn(2) > 0 {
  621. p.ReplyTo = fmt.Sprintf("%d", r.Int())
  622. }
  623. if r.Intn(2) > 0 {
  624. p.MessageId = fmt.Sprintf("%d", r.Int())
  625. }
  626. if r.Intn(2) > 0 {
  627. p.Type = fmt.Sprintf("%d", r.Int())
  628. }
  629. if r.Intn(2) > 0 {
  630. p.AppId = fmt.Sprintf("%d", r.Int())
  631. }
  632. if r.Intn(2) > 0 {
  633. p.Timestamp = time.Unix(r.Int63(), r.Int63())
  634. }
  635. if t, ok = quick.Value(reflect.TypeOf(p.Body), r); ok {
  636. p.Body = t.Bytes()
  637. }
  638. return reflect.ValueOf(p)
  639. }
  640. func TestQuickPublishOnly(t *testing.T) {
  641. if c := integrationConnection(t, "quick"); c != nil {
  642. defer c.Close()
  643. pub, err := c.Channel()
  644. queue := "test-publish"
  645. if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  646. t.Errorf("Failed to declare: %s", err)
  647. return
  648. }
  649. defer pub.QueueDelete(queue, false, false, false)
  650. quick.Check(func(msg Publishing) bool {
  651. return pub.Publish("", queue, false, false, msg) == nil
  652. }, nil)
  653. }
  654. }
  655. func TestPublishEmptyBody(t *testing.T) {
  656. c := integrationConnection(t, "empty")
  657. if c != nil {
  658. defer c.Close()
  659. ch, err := c.Channel()
  660. if err != nil {
  661. t.Errorf("Failed to create channel")
  662. return
  663. }
  664. queue := "test-TestPublishEmptyBody"
  665. if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  666. t.Fatalf("Could not declare")
  667. }
  668. messages, err := ch.Consume(queue, "", false, false, false, false, nil)
  669. if err != nil {
  670. t.Fatalf("Could not consume")
  671. }
  672. err = ch.Publish("", queue, false, false, Publishing{})
  673. if err != nil {
  674. t.Fatalf("Could not publish")
  675. }
  676. select {
  677. case msg := <-messages:
  678. if len(msg.Body) != 0 {
  679. t.Errorf("Received non empty body")
  680. }
  681. case <-time.After(200 * time.Millisecond):
  682. t.Errorf("Timeout on receive")
  683. }
  684. }
  685. }
  686. func TestPublishEmptyBodyWithHeadersIssue67(t *testing.T) {
  687. c := integrationConnection(t, "issue67")
  688. if c != nil {
  689. defer c.Close()
  690. ch, err := c.Channel()
  691. if err != nil {
  692. t.Errorf("Failed to create channel")
  693. return
  694. }
  695. queue := "test-TestPublishEmptyBodyWithHeaders"
  696. if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  697. t.Fatalf("Could not declare")
  698. }
  699. messages, err := ch.Consume(queue, "", false, false, false, false, nil)
  700. if err != nil {
  701. t.Fatalf("Could not consume")
  702. }
  703. headers := Table{
  704. "ham": "spam",
  705. }
  706. err = ch.Publish("", queue, false, false, Publishing{Headers: headers})
  707. if err != nil {
  708. t.Fatalf("Could not publish")
  709. }
  710. select {
  711. case msg := <-messages:
  712. if msg.Headers["ham"] == nil {
  713. t.Fatalf("Headers aren't sent")
  714. }
  715. if msg.Headers["ham"] != "spam" {
  716. t.Fatalf("Headers are wrong")
  717. }
  718. case <-time.After(200 * time.Millisecond):
  719. t.Errorf("Timeout on receive")
  720. }
  721. }
  722. }
  723. func TestQuickPublishConsumeOnly(t *testing.T) {
  724. c1 := integrationConnection(t, "quick-pub")
  725. c2 := integrationConnection(t, "quick-sub")
  726. if c1 != nil && c2 != nil {
  727. defer c1.Close()
  728. defer c2.Close()
  729. pub, err := c1.Channel()
  730. sub, err := c2.Channel()
  731. queue := "TestPublishConsumeOnly"
  732. if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  733. t.Errorf("Failed to declare: %s", err)
  734. return
  735. }
  736. if _, err = sub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  737. t.Errorf("Failed to declare: %s", err)
  738. return
  739. }
  740. defer sub.QueueDelete(queue, false, false, false)
  741. ch, err := sub.Consume(queue, "", false, false, false, false, nil)
  742. if err != nil {
  743. t.Errorf("Could not sub: %s", err)
  744. }
  745. quick.CheckEqual(
  746. func(msg Publishing) []byte {
  747. empty := Publishing{Body: msg.Body}
  748. if pub.Publish("", queue, false, false, empty) != nil {
  749. return []byte{'X'}
  750. }
  751. return msg.Body
  752. },
  753. func(msg Publishing) []byte {
  754. out := <-ch
  755. out.Ack(false)
  756. return out.Body
  757. },
  758. nil)
  759. }
  760. }
  761. func TestQuickPublishConsumeBigBody(t *testing.T) {
  762. c1 := integrationConnection(t, "big-pub")
  763. c2 := integrationConnection(t, "big-sub")
  764. if c1 != nil && c2 != nil {
  765. defer c1.Close()
  766. defer c2.Close()
  767. pub, err := c1.Channel()
  768. sub, err := c2.Channel()
  769. queue := "test-pubsub"
  770. if _, err = sub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  771. t.Errorf("Failed to declare: %s", err)
  772. return
  773. }
  774. ch, err := sub.Consume(queue, "", false, false, false, false, nil)
  775. if err != nil {
  776. t.Errorf("Could not sub: %s", err)
  777. }
  778. fixture := Publishing{
  779. Body: make([]byte, 1e4+1000),
  780. }
  781. if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  782. t.Errorf("Failed to declare: %s", err)
  783. return
  784. }
  785. err = pub.Publish("", queue, false, false, fixture)
  786. if err != nil {
  787. t.Errorf("Could not publish big body")
  788. }
  789. select {
  790. case msg := <-ch:
  791. if bytes.Compare(msg.Body, fixture.Body) != 0 {
  792. t.Errorf("Consumed big body didn't match")
  793. }
  794. case <-time.After(200 * time.Millisecond):
  795. t.Errorf("Timeout on receive")
  796. }
  797. }
  798. }
  799. func TestIntegrationGetOk(t *testing.T) {
  800. if c := integrationConnection(t, "getok"); c != nil {
  801. defer c.Close()
  802. queue := "test.get-ok"
  803. ch, _ := c.Channel()
  804. ch.QueueDeclare(queue, false, true, false, false, nil)
  805. ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
  806. msg, ok, err := ch.Get(queue, false)
  807. if err != nil {
  808. t.Fatalf("Failed get: %v", err)
  809. }
  810. if !ok {
  811. t.Fatalf("Get on a queued message did not find the message")
  812. }
  813. if string(msg.Body) != "ok" {
  814. t.Fatalf("Get did not get the correct message")
  815. }
  816. }
  817. }
  818. func TestIntegrationGetEmpty(t *testing.T) {
  819. if c := integrationConnection(t, "getok"); c != nil {
  820. defer c.Close()
  821. queue := "test.get-ok"
  822. ch, _ := c.Channel()
  823. ch.QueueDeclare(queue, false, true, false, false, nil)
  824. _, ok, err := ch.Get(queue, false)
  825. if err != nil {
  826. t.Fatalf("Failed get: %v", err)
  827. }
  828. if !ok {
  829. t.Fatalf("Get on a queued message retrieved a message when it shouldn't have")
  830. }
  831. }
  832. }
  833. func TestIntegrationTxCommit(t *testing.T) {
  834. if c := integrationConnection(t, "txcommit"); c != nil {
  835. defer c.Close()
  836. queue := "test.tx.commit"
  837. ch, _ := c.Channel()
  838. ch.QueueDeclare(queue, false, true, false, false, nil)
  839. if err := ch.Tx(); err != nil {
  840. t.Fatalf("tx.select failed")
  841. }
  842. ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
  843. if err := ch.TxCommit(); err != nil {
  844. t.Fatalf("tx.commit failed")
  845. }
  846. msg, ok, err := ch.Get(queue, false)
  847. if err != nil || !ok {
  848. t.Fatalf("Failed get: %v", err)
  849. }
  850. if string(msg.Body) != "ok" {
  851. t.Fatalf("Get did not get the correct message from the transaction")
  852. }
  853. }
  854. }
  855. func TestIntegrationTxRollback(t *testing.T) {
  856. if c := integrationConnection(t, "txrollback"); c != nil {
  857. defer c.Close()
  858. queue := "test.tx.rollback"
  859. ch, _ := c.Channel()
  860. ch.QueueDeclare(queue, false, true, false, false, nil)
  861. if err := ch.Tx(); err != nil {
  862. t.Fatalf("tx.select failed")
  863. }
  864. ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
  865. if err := ch.TxRollback(); err != nil {
  866. t.Fatalf("tx.rollback failed")
  867. }
  868. _, ok, err := ch.Get(queue, false)
  869. if err != nil {
  870. t.Fatalf("Failed get: %v", err)
  871. }
  872. if ok {
  873. t.Fatalf("message was published when it should have been rolled back")
  874. }
  875. }
  876. }
  877. func TestIntegrationReturn(t *testing.T) {
  878. if c, ch := integrationQueue(t, "return"); c != nil {
  879. defer c.Close()
  880. ret := make(chan Return, 1)
  881. ch.NotifyReturn(ret)
  882. // mandatory publish to an exchange without a binding should be returned
  883. ch.Publish("", "return-without-binding", true, false, Publishing{Body: []byte("mandatory")})
  884. select {
  885. case res := <-ret:
  886. if string(res.Body) != "mandatory" {
  887. t.Fatalf("expected return of the same message")
  888. }
  889. if res.ReplyCode != NoRoute {
  890. t.Fatalf("expected no consumers reply code on the Return result, got: %v", res.ReplyCode)
  891. }
  892. case <-time.After(200 * time.Millisecond):
  893. t.Fatalf("no return was received within 200ms")
  894. }
  895. }
  896. }
  897. func TestIntegrationCancel(t *testing.T) {
  898. queue := "cancel"
  899. consumerTag := "test.cancel"
  900. if c, ch := integrationQueue(t, queue); c != nil {
  901. defer c.Close()
  902. cancels := ch.NotifyCancel(make(chan string, 1))
  903. go func() {
  904. if _, err := ch.Consume(queue, consumerTag, false, false, false, false, nil); err != nil {
  905. t.Fatalf("cannot consume from %q to test NotifyCancel: %v", queue, err)
  906. }
  907. if _, err := ch.QueueDelete(queue, false, false, false); err != nil {
  908. t.Fatalf("cannot delete integration queue: %v", err)
  909. }
  910. }()
  911. select {
  912. case tag := <-cancels:
  913. if want, got := consumerTag, tag; want != got {
  914. t.Fatalf("expected to be notified of deleted queue with consumer tag, got: %q", got)
  915. }
  916. case <-time.After(200 * time.Millisecond):
  917. t.Fatalf("expected to be notified of deleted queue with 200ms")
  918. }
  919. }
  920. }
  921. func TestIntegrationConfirm(t *testing.T) {
  922. if c, ch := integrationQueue(t, "confirm"); c != nil {
  923. defer c.Close()
  924. ack, nack := make(chan uint64, 1), make(chan uint64, 1)
  925. ch.NotifyConfirm(ack, nack)
  926. if err := ch.Confirm(false); err != nil {
  927. t.Fatalf("could not confirm")
  928. }
  929. ch.Publish("", "confirm", false, false, Publishing{Body: []byte("confirm")})
  930. select {
  931. case tag := <-ack:
  932. if tag != 1 {
  933. t.Fatalf("expected ack starting with delivery tag of 1")
  934. }
  935. case <-time.After(200 * time.Millisecond):
  936. t.Fatalf("no ack was received within 200ms")
  937. }
  938. }
  939. }
  940. // https://github.com/streadway/amqp/issues/61
  941. func TestRoundTripAllFieldValueTypes61(t *testing.T) {
  942. if conn := integrationConnection(t, "issue61"); conn != nil {
  943. defer conn.Close()
  944. timestamp := time.Unix(100000000, 0)
  945. headers := Table{
  946. "A": []interface{}{
  947. []interface{}{"nested array", int32(3)},
  948. Decimal{2, 1},
  949. Table{"S": "nested table in array"},
  950. int32(2 << 20),
  951. string("array string"),
  952. timestamp,
  953. nil,
  954. byte(2),
  955. float64(2.64),
  956. float32(2.32),
  957. int64(2 << 60),
  958. int16(2 << 10),
  959. bool(true),
  960. []byte{'b', '2'},
  961. },
  962. "D": Decimal{1, 1},
  963. "F": Table{"S": "nested table in table"},
  964. "I": int32(1 << 20),
  965. "S": string("string"),
  966. "T": timestamp,
  967. "V": nil,
  968. "b": byte(1),
  969. "d": float64(1.64),
  970. "f": float32(1.32),
  971. "l": int64(1 << 60),
  972. "s": int16(1 << 10),
  973. "t": bool(true),
  974. "x": []byte{'b', '1'},
  975. }
  976. queue := "test.issue61-roundtrip"
  977. ch, _ := conn.Channel()
  978. if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  979. t.Fatalf("Could not declare")
  980. }
  981. msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
  982. if err != nil {
  983. t.Fatalf("Could not consume")
  984. }
  985. err = ch.Publish("", queue, false, false, Publishing{Body: []byte("ignored"), Headers: headers})
  986. if err != nil {
  987. t.Fatalf("Could not publish: %v", err)
  988. }
  989. msg, ok := <-msgs
  990. if !ok {
  991. t.Fatalf("Channel closed prematurely likely due to publish exception")
  992. }
  993. for k, v := range headers {
  994. if !reflect.DeepEqual(v, msg.Headers[k]) {
  995. t.Errorf("Round trip header not the same for key %q: expected: %#v, got %#v", k, v, msg.Headers[k])
  996. }
  997. }
  998. }
  999. }
  1000. // Declares a queue with the x-message-ttl extension to exercise integer
  1001. // serialization.
  1002. //
  1003. // Relates to https://github.com/streadway/amqp/issues/60
  1004. //
  1005. func TestDeclareArgsXMessageTTL(t *testing.T) {
  1006. if conn := integrationConnection(t, "declareTTL"); conn != nil {
  1007. defer conn.Close()
  1008. ch, _ := conn.Channel()
  1009. args := Table{"x-message-ttl": int32(9000000)}
  1010. // should not drop the connection
  1011. if _, err := ch.QueueDeclare("declareWithTTL", false, true, false, false, args); err != nil {
  1012. t.Fatalf("cannot declare with TTL: got: %v", err)
  1013. }
  1014. }
  1015. }
  1016. // Sets up the topology where rejected messages will be forwarded
  1017. // to a fanout exchange, with a single queue bound.
  1018. //
  1019. // Relates to https://github.com/streadway/amqp/issues/56
  1020. //
  1021. func TestDeclareArgsRejectToDeadLetterQueue(t *testing.T) {
  1022. if conn := integrationConnection(t, "declareArgs"); conn != nil {
  1023. defer conn.Close()
  1024. ex, q := "declareArgs", "declareArgs-deliveries"
  1025. dlex, dlq := ex+"-dead-letter", q+"-dead-letter"
  1026. ch, _ := conn.Channel()
  1027. if err := ch.ExchangeDeclare(ex, "fanout", false, true, false, false, nil); err != nil {
  1028. t.Fatalf("cannot declare %v: got: %v", ex, err)
  1029. }
  1030. if err := ch.ExchangeDeclare(dlex, "fanout", false, true, false, false, nil); err != nil {
  1031. t.Fatalf("cannot declare %v: got: %v", dlex, err)
  1032. }
  1033. if _, err := ch.QueueDeclare(dlq, false, true, false, false, nil); err != nil {
  1034. t.Fatalf("cannot declare %v: got: %v", dlq, err)
  1035. }
  1036. if err := ch.QueueBind(dlq, "#", dlex, false, nil); err != nil {
  1037. t.Fatalf("cannot bind %v to %v: got: %v", dlq, dlex, err)
  1038. }
  1039. if _, err := ch.QueueDeclare(q, false, true, false, false, Table{
  1040. "x-dead-letter-exchange": dlex,
  1041. }); err != nil {
  1042. t.Fatalf("cannot declare %v with dlq %v: got: %v", q, dlex, err)
  1043. }
  1044. if err := ch.QueueBind(q, "#", ex, false, nil); err != nil {
  1045. t.Fatalf("cannot bind %v: got: %v", ex, err)
  1046. }
  1047. fails, err := ch.Consume(q, "", false, false, false, false, nil)
  1048. if err != nil {
  1049. t.Fatalf("cannot consume %v: got: %v", q, err)
  1050. }
  1051. // Reject everything consumed
  1052. go func() {
  1053. for d := range fails {
  1054. d.Reject(false)
  1055. }
  1056. }()
  1057. // Publish the 'poison'
  1058. if err := ch.Publish(ex, q, true, false, Publishing{Body: []byte("ignored")}); err != nil {
  1059. t.Fatalf("publishing failed")
  1060. }
  1061. // spin-get until message arrives on the dead-letter queue with a
  1062. // synchronous parse to exercise the array field (x-death) set by the
  1063. // server relating to issue-56
  1064. for i := 0; i < 10; i++ {
  1065. d, got, err := ch.Get(dlq, false)
  1066. if !got && err == nil {
  1067. continue
  1068. } else if err != nil {
  1069. t.Fatalf("expected success in parsing reject, got: %v", err)
  1070. } else {
  1071. // pass if we've parsed an array
  1072. if v, ok := d.Headers["x-death"]; ok {
  1073. if _, ok := v.([]interface{}); ok {
  1074. return
  1075. }
  1076. }
  1077. t.Fatalf("array field x-death expected in the headers, got: %v (%T)", d.Headers, d.Headers["x-death"])
  1078. }
  1079. }
  1080. t.Fatalf("expectd dead-letter after 10 get attempts")
  1081. }
  1082. }
  1083. // https://github.com/streadway/amqp/issues/48
  1084. func TestDeadlockConsumerIssue48(t *testing.T) {
  1085. if conn := integrationConnection(t, "issue48"); conn != nil {
  1086. defer conn.Close()
  1087. deadline := make(chan bool)
  1088. go func() {
  1089. select {
  1090. case <-time.After(5 * time.Second):
  1091. panic("expected to receive 2 deliveries while in an RPC, got a deadlock")
  1092. case <-deadline:
  1093. // pass
  1094. }
  1095. }()
  1096. ch, err := conn.Channel()
  1097. if err != nil {
  1098. t.Fatalf("got error on channel.open: %v", err)
  1099. }
  1100. queue := "test-issue48"
  1101. if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  1102. t.Fatalf("expected to declare a queue: %v", err)
  1103. }
  1104. if err := ch.Confirm(false); err != nil {
  1105. t.Fatalf("got error on confirm: %v", err)
  1106. }
  1107. ack, nack := make(chan uint64, 2), make(chan uint64, 2)
  1108. ch.NotifyConfirm(ack, nack)
  1109. for i := 0; i < cap(ack); i++ {
  1110. // Fill the queue with some new or remaining publishings
  1111. ch.Publish("", queue, false, false, Publishing{Body: []byte("")})
  1112. }
  1113. for i := 0; i < cap(ack); i++ {
  1114. // Wait for them to land on the queue so they'll be delivered on consume
  1115. <-ack
  1116. }
  1117. // Consuming should send them all on the wire
  1118. msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
  1119. if err != nil {
  1120. t.Fatalf("got error on consume: %v", err)
  1121. }
  1122. // We pop one off the chan, the other is on the wire
  1123. <-msgs
  1124. // Opening a new channel (any RPC) while another delivery is on the wire
  1125. if _, err := conn.Channel(); err != nil {
  1126. t.Fatalf("got error on consume: %v", err)
  1127. }
  1128. // We pop the next off the chan
  1129. <-msgs
  1130. deadline <- true
  1131. }
  1132. }
  1133. // https://github.com/streadway/amqp/issues/46
  1134. func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) {
  1135. conn := integrationConnection(t, "issue46")
  1136. if conn != nil {
  1137. for i := 0; i < 100; i++ {
  1138. ch, err := conn.Channel()
  1139. if err != nil {
  1140. t.Fatalf("expected error only on publish, got error on channel.open: %v", err)
  1141. }
  1142. for j := 0; j < 10; j++ {
  1143. err = ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
  1144. if err, ok := err.(Error); ok {
  1145. if err.Code != 504 {
  1146. t.Fatalf("expected channel only exception, got: %v", err)
  1147. }
  1148. }
  1149. }
  1150. }
  1151. }
  1152. }
  1153. // https://github.com/streadway/amqp/issues/43
  1154. func TestChannelExceptionWithCloseIssue43(t *testing.T) {
  1155. conn := integrationConnection(t, "issue43")
  1156. if conn != nil {
  1157. go func() {
  1158. for err := range conn.NotifyClose(make(chan *Error)) {
  1159. t.Log(err.Error())
  1160. }
  1161. }()
  1162. c1, err := conn.Channel()
  1163. if err != nil {
  1164. panic(err)
  1165. }
  1166. go func() {
  1167. for err := range c1.NotifyClose(make(chan *Error)) {
  1168. t.Log("Channel1 Close: " + err.Error())
  1169. }
  1170. }()
  1171. c2, err := conn.Channel()
  1172. if err != nil {
  1173. panic(err)
  1174. }
  1175. go func() {
  1176. for err := range c2.NotifyClose(make(chan *Error)) {
  1177. t.Log("Channel2 Close: " + err.Error())
  1178. }
  1179. }()
  1180. // Cause an asynchronous channel exception causing the server
  1181. // to send a "channel.close" method either before or after the next
  1182. // asynchronous method.
  1183. err = c1.Publish("nonexisting-exchange", "", false, false, Publishing{})
  1184. if err != nil {
  1185. panic(err)
  1186. }
  1187. // Receive or send the channel close method, the channel shuts down
  1188. // but this expects a channel.close-ok to be received.
  1189. c1.Close()
  1190. // This ensures that the 2nd channel is unaffected by the channel exception
  1191. // on channel 1.
  1192. err = c2.ExchangeDeclare("test-channel-still-exists", "direct", false, true, false, false, nil)
  1193. if err != nil {
  1194. panic(err)
  1195. }
  1196. }
  1197. }
  1198. // https://github.com/streadway/amqp/issues/7
  1199. func TestCorruptedMessageIssue7(t *testing.T) {
  1200. messageCount := 1024
  1201. c1 := integrationConnection(t, "")
  1202. c2 := integrationConnection(t, "")
  1203. if c1 != nil && c2 != nil {
  1204. defer c1.Close()
  1205. defer c2.Close()
  1206. pub, err := c1.Channel()
  1207. if err != nil {
  1208. t.Fatalf("Cannot create Channel")
  1209. }
  1210. sub, err := c2.Channel()
  1211. if err != nil {
  1212. t.Fatalf("Cannot create Channel")
  1213. }
  1214. queue := "test-corrupted-message-regression"
  1215. if _, err := pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  1216. t.Fatalf("Cannot declare")
  1217. }
  1218. if _, err := sub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  1219. t.Fatalf("Cannot declare")
  1220. }
  1221. msgs, err := sub.Consume(queue, "", false, false, false, false, nil)
  1222. if err != nil {
  1223. t.Fatalf("Cannot consume")
  1224. }
  1225. for i := 0; i < messageCount; i++ {
  1226. err := pub.Publish("", queue, false, false, Publishing{
  1227. Body: generateCrc32Random(7 * i),
  1228. })
  1229. if err != nil {
  1230. t.Fatalf("Failed to publish")
  1231. }
  1232. }
  1233. for i := 0; i < messageCount; i++ {
  1234. select {
  1235. case msg := <-msgs:
  1236. assertMessageCrc32(t, msg.Body, fmt.Sprintf("missed match at %d", i))
  1237. case <-time.After(200 * time.Millisecond):
  1238. t.Fatalf("Timeout on recv")
  1239. }
  1240. }
  1241. }
  1242. }
  1243. func TestExchangeDeclarePrecondition(t *testing.T) {
  1244. c1 := integrationConnection(t, "exchange-double-declare")
  1245. c2 := integrationConnection(t, "exchange-double-declare-cleanup")
  1246. if c1 != nil && c2 != nil {
  1247. defer c1.Close()
  1248. defer c2.Close()
  1249. ch, err := c1.Channel()
  1250. if err != nil {
  1251. t.Fatalf("Create channel")
  1252. }
  1253. exchange := "test-mismatched-redeclare"
  1254. err = ch.ExchangeDeclare(
  1255. exchange,
  1256. "direct", // exchangeType
  1257. false, // durable
  1258. true, // auto-delete
  1259. false, // internal
  1260. false, // noWait
  1261. nil, // arguments
  1262. )
  1263. if err != nil {
  1264. t.Fatalf("Could not initially declare exchange")
  1265. }
  1266. err = ch.ExchangeDeclare(
  1267. exchange,
  1268. "direct",
  1269. true, // different durability
  1270. true,
  1271. false,
  1272. false,
  1273. nil,
  1274. )
  1275. if err == nil {
  1276. t.Fatalf("Expected to fail a redeclare with different durability, didn't receive an error")
  1277. }
  1278. if err, ok := err.(Error); ok {
  1279. if err.Code != PreconditionFailed {
  1280. t.Fatalf("Expected precondition error")
  1281. }
  1282. if !err.Recover {
  1283. t.Fatalf("Expected to be able to recover")
  1284. }
  1285. }
  1286. ch2, _ := c2.Channel()
  1287. if err = ch2.ExchangeDelete(exchange, false, false); err != nil {
  1288. t.Fatalf("Could not delete exchange: %v", err)
  1289. }
  1290. }
  1291. }
  1292. func TestRabbitMQQueueTTLGet(t *testing.T) {
  1293. if c := integrationRabbitMQ(t, "ttl"); c != nil {
  1294. defer c.Close()
  1295. queue := "test.rabbitmq-message-ttl"
  1296. channel, err := c.Channel()
  1297. if err != nil {
  1298. t.Fatalf("channel: %v", err)
  1299. }
  1300. if _, err = channel.QueueDeclare(
  1301. queue,
  1302. false,
  1303. true,
  1304. false,
  1305. false,
  1306. Table{"x-message-ttl": int32(100)}, // in ms
  1307. ); err != nil {
  1308. t.Fatalf("queue declare: %s", err)
  1309. }
  1310. channel.Publish("", queue, false, false, Publishing{Body: []byte("ttl")})
  1311. time.Sleep(200 * time.Millisecond)
  1312. _, ok, err := channel.Get(queue, false)
  1313. if ok {
  1314. t.Fatalf("Expected the message to expire in 100ms, it didn't expire after 200ms")
  1315. }
  1316. if err != nil {
  1317. t.Fatalf("Failed to get on ttl queue")
  1318. }
  1319. }
  1320. }
  1321. func TestRabbitMQQueueNackMultipleRequeue(t *testing.T) {
  1322. if c := integrationRabbitMQ(t, "nack"); c != nil {
  1323. defer c.Close()
  1324. if c.isCapable("basic.nack") {
  1325. queue := "test.rabbitmq-basic-nack"
  1326. channel, err := c.Channel()
  1327. if err != nil {
  1328. t.Fatalf("channel: %v", err)
  1329. }
  1330. if _, err = channel.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  1331. t.Fatalf("queue declare: %s", err)
  1332. }
  1333. channel.Publish("", queue, false, false, Publishing{Body: []byte("1")})
  1334. channel.Publish("", queue, false, false, Publishing{Body: []byte("2")})
  1335. m1, ok, err := channel.Get(queue, false)
  1336. if !ok || err != nil || m1.Body[0] != '1' {
  1337. t.Fatalf("could not get message %v", m1)
  1338. }
  1339. m2, ok, err := channel.Get(queue, false)
  1340. if !ok || err != nil || m2.Body[0] != '2' {
  1341. t.Fatalf("could not get message %v", m2)
  1342. }
  1343. m2.Nack(true, true)
  1344. m1, ok, err = channel.Get(queue, false)
  1345. if !ok || err != nil || m1.Body[0] != '1' {
  1346. t.Fatalf("could not get message %v", m1)
  1347. }
  1348. m2, ok, err = channel.Get(queue, false)
  1349. if !ok || err != nil || m2.Body[0] != '2' {
  1350. t.Fatalf("could not get message %v", m2)
  1351. }
  1352. }
  1353. }
  1354. }
  1355. /*
  1356. * Support for integration tests
  1357. */
  1358. func integrationURLFromEnv() string {
  1359. url := os.Getenv("AMQP_URL")
  1360. if url == "" {
  1361. url = "amqp://"
  1362. }
  1363. return url
  1364. }
  1365. func loggedConnection(t *testing.T, conn *Connection, name string) *Connection {
  1366. if name != "" {
  1367. conn.conn = &logIO{t, name, conn.conn}
  1368. }
  1369. return conn
  1370. }
  1371. // Returns a conneciton to the AMQP if the AMQP_URL environment
  1372. // variable is set and a connnection can be established.
  1373. func integrationConnection(t *testing.T, name string) *Connection {
  1374. conn, err := Dial(integrationURLFromEnv())
  1375. if err != nil {
  1376. t.Errorf("dial integration server: %s", err)
  1377. return nil
  1378. }
  1379. return loggedConnection(t, conn, name)
  1380. }
  1381. // Returns a connection, channel and delcares a queue when the AMQP_URL is in the environment
  1382. func integrationQueue(t *testing.T, name string) (*Connection, *Channel) {
  1383. if conn := integrationConnection(t, name); conn != nil {
  1384. if channel, err := conn.Channel(); err == nil {
  1385. if _, err = channel.QueueDeclare(name, false, true, false, false, nil); err == nil {
  1386. return conn, channel
  1387. }
  1388. }
  1389. }
  1390. return nil, nil
  1391. }
  1392. // Delegates to integrationConnection and only returns a connection if the
  1393. // product is RabbitMQ
  1394. func integrationRabbitMQ(t *testing.T, name string) *Connection {
  1395. if conn := integrationConnection(t, "connect"); conn != nil {
  1396. if server, ok := conn.Properties["product"]; ok && server == "RabbitMQ" {
  1397. return conn
  1398. }
  1399. }
  1400. return nil
  1401. }
  1402. func assertConsumeBody(t *testing.T, messages <-chan Delivery, want []byte) (msg *Delivery) {
  1403. select {
  1404. case got := <-messages:
  1405. if bytes.Compare(want, got.Body) != 0 {
  1406. t.Fatalf("Message body does not match want: %v, got: %v, for: %+v", want, got.Body, got)
  1407. }
  1408. msg = &got
  1409. case <-time.After(200 * time.Millisecond):
  1410. t.Fatalf("Timeout waiting for %v", want)
  1411. }
  1412. return msg
  1413. }
  1414. // Pulls out the CRC and verifies the remaining content against the CRC
  1415. func assertMessageCrc32(t *testing.T, msg []byte, assert string) {
  1416. size := binary.BigEndian.Uint32(msg[:4])
  1417. crc := crc32.NewIEEE()
  1418. crc.Write(msg[8:])
  1419. if binary.BigEndian.Uint32(msg[4:8]) != crc.Sum32() {
  1420. t.Fatalf("Message does not match CRC: %s", assert)
  1421. }
  1422. if int(size) != len(msg)-8 {
  1423. t.Fatalf("Message does not match size, should=%d, is=%d: %s", size, len(msg)-8, assert)
  1424. }
  1425. }
  1426. // Creates a random body size with a leading 32-bit CRC in network byte order
  1427. // that verifies the remaining slice
  1428. func generateCrc32Random(size int) []byte {
  1429. msg := make([]byte, size+8)
  1430. if _, err := io.ReadFull(devrand.Reader, msg); err != nil {
  1431. panic(err)
  1432. }
  1433. crc := crc32.NewIEEE()
  1434. crc.Write(msg[8:])
  1435. binary.BigEndian.PutUint32(msg[0:4], uint32(size))
  1436. binary.BigEndian.PutUint32(msg[4:8], crc.Sum32())
  1437. return msg
  1438. }