client_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "bytes"
  8. "io"
  9. "reflect"
  10. "testing"
  11. "time"
  12. )
  13. type server struct {
  14. *testing.T
  15. r reader // framer <- client
  16. w writer // framer -> client
  17. S io.ReadWriteCloser // Server IO
  18. C io.ReadWriteCloser // Client IO
  19. // captured client frames
  20. start connectionStartOk
  21. tune connectionTuneOk
  22. }
  23. func defaultConfig() Config {
  24. return Config{SASL: []Authentication{&PlainAuth{"guest", "guest"}}, Vhost: "/"}
  25. }
  26. func newSession(t *testing.T) (io.ReadWriteCloser, *server) {
  27. rs, wc := io.Pipe()
  28. rc, ws := io.Pipe()
  29. rws := &logIO{t, "server", pipe{rs, ws}}
  30. rwc := &logIO{t, "client", pipe{rc, wc}}
  31. server := server{
  32. T: t,
  33. r: reader{rws},
  34. w: writer{rws},
  35. S: rws,
  36. C: rwc,
  37. }
  38. return rwc, &server
  39. }
  40. func (t *server) expectBytes(b []byte) {
  41. in := make([]byte, len(b))
  42. if _, err := io.ReadFull(t.S, in); err != nil {
  43. t.Fatalf("io error expecting bytes: %v", err)
  44. }
  45. if bytes.Compare(b, in) != 0 {
  46. t.Fatalf("failed bytes: expected: %s got: %s", string(b), string(in))
  47. }
  48. }
  49. func (t *server) send(channel int, m message) {
  50. defer time.AfterFunc(time.Second, func() { panic("send deadlock") }).Stop()
  51. if err := t.w.WriteFrame(&methodFrame{
  52. ChannelId: uint16(channel),
  53. Method: m,
  54. }); err != nil {
  55. t.Fatalf("frame err, write: %s", err)
  56. }
  57. }
  58. // drops all but method frames expected on the given channel
  59. func (t *server) recv(channel int, m message) message {
  60. defer time.AfterFunc(time.Second, func() { panic("recv deadlock") }).Stop()
  61. var remaining int
  62. var header *headerFrame
  63. var body []byte
  64. for {
  65. frame, err := t.r.ReadFrame()
  66. if err != nil {
  67. t.Fatalf("frame err, read: %s", err)
  68. }
  69. if frame.channel() != uint16(channel) {
  70. t.Fatalf("expected frame on channel %d, got channel %d", channel, frame.channel())
  71. }
  72. switch f := frame.(type) {
  73. case *heartbeatFrame:
  74. // drop
  75. case *headerFrame:
  76. // start content state
  77. header = f
  78. remaining = int(header.Size)
  79. if remaining == 0 {
  80. m.(messageWithContent).setContent(header.Properties, nil)
  81. return m
  82. }
  83. case *bodyFrame:
  84. // continue until terminated
  85. body = append(body, f.Body...)
  86. remaining -= len(f.Body)
  87. if remaining <= 0 {
  88. m.(messageWithContent).setContent(header.Properties, body)
  89. return m
  90. }
  91. case *methodFrame:
  92. if reflect.TypeOf(m) == reflect.TypeOf(f.Method) {
  93. wantv := reflect.ValueOf(m).Elem()
  94. havev := reflect.ValueOf(f.Method).Elem()
  95. wantv.Set(havev)
  96. if _, ok := m.(messageWithContent); !ok {
  97. return m
  98. }
  99. } else {
  100. t.Fatalf("expected method type: %T, got: %T", m, f.Method)
  101. }
  102. default:
  103. t.Fatalf("unexpected frame: %+v", f)
  104. }
  105. }
  106. panic("unreachable")
  107. }
  108. func (t *server) expectAMQP() {
  109. t.expectBytes([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
  110. }
  111. func (t *server) connectionStart() {
  112. t.send(0, &connectionStart{
  113. VersionMajor: 0,
  114. VersionMinor: 9,
  115. Mechanisms: "PLAIN",
  116. Locales: "en-us",
  117. })
  118. t.recv(0, &t.start)
  119. }
  120. func (t *server) connectionTune() {
  121. t.send(0, &connectionTune{
  122. ChannelMax: 11,
  123. FrameMax: 20000,
  124. Heartbeat: 10,
  125. })
  126. t.recv(0, &t.tune)
  127. }
  128. func (t *server) connectionOpen() {
  129. t.expectAMQP()
  130. t.connectionStart()
  131. t.connectionTune()
  132. t.recv(0, &connectionOpen{})
  133. t.send(0, &connectionOpenOk{})
  134. }
  135. func (t *server) connectionClose() {
  136. t.recv(0, &connectionClose{})
  137. t.send(0, &connectionCloseOk{})
  138. }
  139. func (t *server) channelOpen(id int) {
  140. t.recv(id, &channelOpen{})
  141. t.send(id, &channelOpenOk{})
  142. }
  143. func TestDefaultClientProperties(t *testing.T) {
  144. rwc, srv := newSession(t)
  145. go func() {
  146. srv.connectionOpen()
  147. rwc.Close()
  148. }()
  149. if c, err := Open(rwc, defaultConfig()); err != nil {
  150. t.Fatalf("could not create connection: %v (%s)", c, err)
  151. }
  152. if want, got := defaultProduct, srv.start.ClientProperties["product"]; want != got {
  153. t.Errorf("expected product %s got: %s", want, got)
  154. }
  155. if want, got := defaultVersion, srv.start.ClientProperties["version"]; want != got {
  156. t.Errorf("expected version %s got: %s", want, got)
  157. }
  158. }
  159. func TestCustomClientProperties(t *testing.T) {
  160. rwc, srv := newSession(t)
  161. config := defaultConfig()
  162. config.Properties = Table{
  163. "product": "foo",
  164. "version": "1.0",
  165. }
  166. go func() {
  167. srv.connectionOpen()
  168. rwc.Close()
  169. }()
  170. if c, err := Open(rwc, config); err != nil {
  171. t.Fatalf("could not create connection: %v (%s)", c, err)
  172. }
  173. if want, got := config.Properties["product"], srv.start.ClientProperties["product"]; want != got {
  174. t.Errorf("expected product %s got: %s", want, got)
  175. }
  176. if want, got := config.Properties["version"], srv.start.ClientProperties["version"]; want != got {
  177. t.Errorf("expected version %s got: %s", want, got)
  178. }
  179. }
  180. func TestOpen(t *testing.T) {
  181. rwc, srv := newSession(t)
  182. go func() {
  183. srv.connectionOpen()
  184. rwc.Close()
  185. }()
  186. if c, err := Open(rwc, defaultConfig()); err != nil {
  187. t.Fatalf("could not create connection: %v (%s)", c, err)
  188. }
  189. }
  190. func TestChannelOpen(t *testing.T) {
  191. rwc, srv := newSession(t)
  192. go func() {
  193. srv.connectionOpen()
  194. srv.channelOpen(1)
  195. rwc.Close()
  196. }()
  197. c, err := Open(rwc, defaultConfig())
  198. if err != nil {
  199. t.Fatalf("could not create connection: %v (%s)", c, err)
  200. }
  201. ch, err := c.Channel()
  202. if err != nil {
  203. t.Fatalf("could not open channel: %v (%s)", ch, err)
  204. }
  205. }
  206. func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
  207. rwc, srv := newSession(t)
  208. go func() {
  209. srv.expectAMQP()
  210. srv.send(0, &connectionStart{
  211. VersionMajor: 0,
  212. VersionMinor: 9,
  213. Mechanisms: "KERBEROS NTLM",
  214. Locales: "en-us",
  215. })
  216. }()
  217. c, err := Open(rwc, defaultConfig())
  218. if err != ErrSASL {
  219. t.Fatalf("expected ErrSASL got: %+v on %+v", err, c)
  220. }
  221. }
  222. func TestOpenFailedCredentials(t *testing.T) {
  223. rwc, srv := newSession(t)
  224. go func() {
  225. srv.expectAMQP()
  226. srv.connectionStart()
  227. // Now kill/timeout the connection indicating bad auth
  228. rwc.Close()
  229. }()
  230. c, err := Open(rwc, defaultConfig())
  231. if err != ErrCredentials {
  232. t.Fatalf("expected ErrCredentials got: %+v on %+v", err, c)
  233. }
  234. }
  235. func TestOpenFailedVhost(t *testing.T) {
  236. rwc, srv := newSession(t)
  237. go func() {
  238. srv.expectAMQP()
  239. srv.connectionStart()
  240. srv.connectionTune()
  241. srv.recv(0, &connectionOpen{})
  242. // Now kill/timeout the connection on bad Vhost
  243. rwc.Close()
  244. }()
  245. c, err := Open(rwc, defaultConfig())
  246. if err != ErrVhost {
  247. t.Fatalf("expected ErrVhost got: %+v on %+v", err, c)
  248. }
  249. }
  250. func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
  251. rwc, srv := newSession(t)
  252. defer rwc.Close()
  253. go func() {
  254. srv.connectionOpen()
  255. srv.channelOpen(1)
  256. srv.recv(1, &confirmSelect{})
  257. srv.send(1, &confirmSelectOk{})
  258. srv.recv(1, &basicPublish{})
  259. srv.recv(1, &basicPublish{})
  260. srv.recv(1, &basicPublish{})
  261. srv.recv(1, &basicPublish{})
  262. // Single tag, plus multiple, should produce
  263. // 2, 1, 3, 4
  264. srv.send(1, &basicAck{DeliveryTag: 2})
  265. srv.send(1, &basicAck{DeliveryTag: 4, Multiple: true})
  266. srv.recv(1, &basicPublish{})
  267. srv.recv(1, &basicPublish{})
  268. srv.recv(1, &basicPublish{})
  269. srv.recv(1, &basicPublish{})
  270. // And some more, but in reverse order, multiple then one
  271. // 5, 6, 7, 8
  272. srv.send(1, &basicAck{DeliveryTag: 6, Multiple: true})
  273. srv.send(1, &basicAck{DeliveryTag: 8})
  274. srv.send(1, &basicAck{DeliveryTag: 7})
  275. }()
  276. c, err := Open(rwc, defaultConfig())
  277. if err != nil {
  278. t.Fatalf("could not create connection: %v (%s)", c, err)
  279. }
  280. ch, err := c.Channel()
  281. if err != nil {
  282. t.Fatalf("could not open channel: %v (%s)", ch, err)
  283. }
  284. acks, _ := ch.NotifyConfirm(make(chan uint64), make(chan uint64))
  285. ch.Confirm(false)
  286. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 1")})
  287. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 2")})
  288. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 3")})
  289. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 4")})
  290. for i, tag := range []uint64{2, 1, 3, 4} {
  291. if ack := <-acks; tag != ack {
  292. t.Fatalf("failed ack, expected ack#%d to be %d, got %d", i, tag, ack)
  293. }
  294. }
  295. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 5")})
  296. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 6")})
  297. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 7")})
  298. ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 8")})
  299. for i, tag := range []uint64{5, 6, 8, 7} {
  300. if ack := <-acks; tag != ack {
  301. t.Fatalf("failed ack, expected ack#%d to be %d, got %d", i, tag, ack)
  302. }
  303. }
  304. }
  305. func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {
  306. rwc, srv := newSession(t)
  307. go func() {
  308. srv.connectionOpen()
  309. srv.channelOpen(1)
  310. srv.recv(1, &confirmSelect{})
  311. srv.send(1, &confirmSelectOk{})
  312. srv.recv(0, &connectionClose{})
  313. srv.send(0, &connectionCloseOk{})
  314. }()
  315. c, err := Open(rwc, defaultConfig())
  316. if err != nil {
  317. t.Fatalf("could not create connection: %v (%s)", c, err)
  318. }
  319. ch, err := c.Channel()
  320. if err != nil {
  321. t.Fatalf("could not open channel: %v (%s)", ch, err)
  322. }
  323. ackAndNack := make(chan uint64)
  324. ch.NotifyConfirm(ackAndNack, ackAndNack)
  325. if err := ch.Confirm(false); err != nil {
  326. t.Fatalf("expected to enter confirm mode: %v", err)
  327. }
  328. if err := c.Close(); err != nil {
  329. t.Fatalf("could not close connection: %v (%s)", c, err)
  330. }
  331. }
  332. func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
  333. rwc, srv := newSession(t)
  334. go func() {
  335. srv.connectionOpen()
  336. srv.channelOpen(1)
  337. srv.recv(0, &connectionClose{})
  338. srv.send(0, &connectionCloseOk{})
  339. }()
  340. c, err := Open(rwc, defaultConfig())
  341. if err != nil {
  342. t.Fatalf("could not create connection: %v (%s)", c, err)
  343. }
  344. ch, err := c.Channel()
  345. if err != nil {
  346. t.Fatalf("could not open channel: %v (%s)", ch, err)
  347. }
  348. if err := c.Close(); err != nil {
  349. t.Fatalf("could not close connection: %v (%s)", c, err)
  350. }
  351. select {
  352. case <-c.NotifyClose(make(chan *Error)):
  353. case <-time.After(time.Millisecond):
  354. t.Errorf("expected to close NotifyClose chan after Connection.Close")
  355. }
  356. select {
  357. case <-ch.NotifyClose(make(chan *Error)):
  358. case <-time.After(time.Millisecond):
  359. t.Errorf("expected to close Connection.NotifyClose chan after Connection.Close")
  360. }
  361. select {
  362. case <-ch.NotifyFlow(make(chan bool)):
  363. case <-time.After(time.Millisecond):
  364. t.Errorf("expected to close Channel.NotifyFlow chan after Connection.Close")
  365. }
  366. select {
  367. case <-ch.NotifyCancel(make(chan string)):
  368. case <-time.After(time.Millisecond):
  369. t.Errorf("expected to close Channel.NofityCancel chan after Connection.Close")
  370. }
  371. select {
  372. case <-ch.NotifyReturn(make(chan Return)):
  373. case <-time.After(time.Millisecond):
  374. t.Errorf("expected to close Channel.NotifyReturn chan after Connection.Close")
  375. }
  376. ack, nack := ch.NotifyConfirm(make(chan uint64), make(chan uint64))
  377. select {
  378. case <-ack:
  379. case <-time.After(time.Millisecond):
  380. t.Errorf("expected to close acks on Channel.NotifyConfirm chan after Connection.Close")
  381. }
  382. select {
  383. case <-nack:
  384. case <-time.After(time.Millisecond):
  385. t.Errorf("expected to close nacks Channel.NotifyConfirm chan after Connection.Close")
  386. }
  387. }
  388. // Should not panic when sending bodies split at differnet boundaries
  389. func TestPublishBodySliceIssue74(t *testing.T) {
  390. rwc, srv := newSession(t)
  391. defer rwc.Close()
  392. const frameSize = 100
  393. const publishings = frameSize * 3
  394. done := make(chan bool)
  395. base := make([]byte, publishings)
  396. go func() {
  397. srv.connectionOpen()
  398. srv.channelOpen(1)
  399. for i := 0; i < publishings; i++ {
  400. srv.recv(1, &basicPublish{})
  401. }
  402. done <- true
  403. }()
  404. cfg := defaultConfig()
  405. cfg.FrameSize = frameSize
  406. c, err := Open(rwc, cfg)
  407. if err != nil {
  408. t.Fatalf("could not create connection: %v (%s)", c, err)
  409. }
  410. ch, err := c.Channel()
  411. if err != nil {
  412. t.Fatalf("could not open channel: %v (%s)", ch, err)
  413. }
  414. for i := 0; i < publishings; i++ {
  415. go ch.Publish("", "q", false, false, Publishing{Body: base[0:i]})
  416. }
  417. <-done
  418. }
  419. func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
  420. rwc, srv := newSession(t)
  421. defer rwc.Close()
  422. go func() {
  423. srv.connectionOpen()
  424. srv.channelOpen(1)
  425. srv.recv(1, &basicPublish{})
  426. // Mimic a broken io pipe so that Publish catches the error and goes into shutdown
  427. srv.S.Close()
  428. }()
  429. c, err := Open(rwc, defaultConfig())
  430. if err != nil {
  431. t.Fatalf("couldn't create connection: %v (%s)", c, err)
  432. }
  433. ch, err := c.Channel()
  434. if err != nil {
  435. t.Fatalf("couldn't open channel: %v (%s)", ch, err)
  436. }
  437. defer time.AfterFunc(500*time.Millisecond, func() { panic("Publish deadlock") }).Stop()
  438. for {
  439. if err := ch.Publish("exchange", "q", false, false, Publishing{Body: []byte("test")}); err != nil {
  440. t.Log("successfully caught disconnect error", err)
  441. return
  442. }
  443. }
  444. }