| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559 |
- // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Source code and contact info at http://github.com/streadway/amqp
- package amqp
- import (
- "bytes"
- "io"
- "reflect"
- "testing"
- "time"
- )
- type server struct {
- *testing.T
- r reader // framer <- client
- w writer // framer -> client
- S io.ReadWriteCloser // Server IO
- C io.ReadWriteCloser // Client IO
- // captured client frames
- start connectionStartOk
- tune connectionTuneOk
- }
- func defaultConfig() Config {
- return Config{SASL: []Authentication{&PlainAuth{"guest", "guest"}}, Vhost: "/"}
- }
- func newSession(t *testing.T) (io.ReadWriteCloser, *server) {
- rs, wc := io.Pipe()
- rc, ws := io.Pipe()
- rws := &logIO{t, "server", pipe{rs, ws}}
- rwc := &logIO{t, "client", pipe{rc, wc}}
- server := server{
- T: t,
- r: reader{rws},
- w: writer{rws},
- S: rws,
- C: rwc,
- }
- return rwc, &server
- }
- func (t *server) expectBytes(b []byte) {
- in := make([]byte, len(b))
- if _, err := io.ReadFull(t.S, in); err != nil {
- t.Fatalf("io error expecting bytes: %v", err)
- }
- if bytes.Compare(b, in) != 0 {
- t.Fatalf("failed bytes: expected: %s got: %s", string(b), string(in))
- }
- }
- func (t *server) send(channel int, m message) {
- defer time.AfterFunc(time.Second, func() { panic("send deadlock") }).Stop()
- if err := t.w.WriteFrame(&methodFrame{
- ChannelId: uint16(channel),
- Method: m,
- }); err != nil {
- t.Fatalf("frame err, write: %s", err)
- }
- }
- // drops all but method frames expected on the given channel
- func (t *server) recv(channel int, m message) message {
- defer time.AfterFunc(time.Second, func() { panic("recv deadlock") }).Stop()
- var remaining int
- var header *headerFrame
- var body []byte
- for {
- frame, err := t.r.ReadFrame()
- if err != nil {
- t.Fatalf("frame err, read: %s", err)
- }
- if frame.channel() != uint16(channel) {
- t.Fatalf("expected frame on channel %d, got channel %d", channel, frame.channel())
- }
- switch f := frame.(type) {
- case *heartbeatFrame:
- // drop
- case *headerFrame:
- // start content state
- header = f
- remaining = int(header.Size)
- if remaining == 0 {
- m.(messageWithContent).setContent(header.Properties, nil)
- return m
- }
- case *bodyFrame:
- // continue until terminated
- body = append(body, f.Body...)
- remaining -= len(f.Body)
- if remaining <= 0 {
- m.(messageWithContent).setContent(header.Properties, body)
- return m
- }
- case *methodFrame:
- if reflect.TypeOf(m) == reflect.TypeOf(f.Method) {
- wantv := reflect.ValueOf(m).Elem()
- havev := reflect.ValueOf(f.Method).Elem()
- wantv.Set(havev)
- if _, ok := m.(messageWithContent); !ok {
- return m
- }
- } else {
- t.Fatalf("expected method type: %T, got: %T", m, f.Method)
- }
- default:
- t.Fatalf("unexpected frame: %+v", f)
- }
- }
- panic("unreachable")
- }
- func (t *server) expectAMQP() {
- t.expectBytes([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
- }
- func (t *server) connectionStart() {
- t.send(0, &connectionStart{
- VersionMajor: 0,
- VersionMinor: 9,
- Mechanisms: "PLAIN",
- Locales: "en-us",
- })
- t.recv(0, &t.start)
- }
- func (t *server) connectionTune() {
- t.send(0, &connectionTune{
- ChannelMax: 11,
- FrameMax: 20000,
- Heartbeat: 10,
- })
- t.recv(0, &t.tune)
- }
- func (t *server) connectionOpen() {
- t.expectAMQP()
- t.connectionStart()
- t.connectionTune()
- t.recv(0, &connectionOpen{})
- t.send(0, &connectionOpenOk{})
- }
- func (t *server) connectionClose() {
- t.recv(0, &connectionClose{})
- t.send(0, &connectionCloseOk{})
- }
- func (t *server) channelOpen(id int) {
- t.recv(id, &channelOpen{})
- t.send(id, &channelOpenOk{})
- }
- func TestDefaultClientProperties(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.connectionOpen()
- rwc.Close()
- }()
- if c, err := Open(rwc, defaultConfig()); err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- if want, got := defaultProduct, srv.start.ClientProperties["product"]; want != got {
- t.Errorf("expected product %s got: %s", want, got)
- }
- if want, got := defaultVersion, srv.start.ClientProperties["version"]; want != got {
- t.Errorf("expected version %s got: %s", want, got)
- }
- }
- func TestCustomClientProperties(t *testing.T) {
- rwc, srv := newSession(t)
- config := defaultConfig()
- config.Properties = Table{
- "product": "foo",
- "version": "1.0",
- }
- go func() {
- srv.connectionOpen()
- rwc.Close()
- }()
- if c, err := Open(rwc, config); err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- if want, got := config.Properties["product"], srv.start.ClientProperties["product"]; want != got {
- t.Errorf("expected product %s got: %s", want, got)
- }
- if want, got := config.Properties["version"], srv.start.ClientProperties["version"]; want != got {
- t.Errorf("expected version %s got: %s", want, got)
- }
- }
- func TestOpen(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.connectionOpen()
- rwc.Close()
- }()
- if c, err := Open(rwc, defaultConfig()); err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- }
- func TestChannelOpen(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.connectionOpen()
- srv.channelOpen(1)
- rwc.Close()
- }()
- c, err := Open(rwc, defaultConfig())
- if err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("could not open channel: %v (%s)", ch, err)
- }
- }
- func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.expectAMQP()
- srv.send(0, &connectionStart{
- VersionMajor: 0,
- VersionMinor: 9,
- Mechanisms: "KERBEROS NTLM",
- Locales: "en-us",
- })
- }()
- c, err := Open(rwc, defaultConfig())
- if err != ErrSASL {
- t.Fatalf("expected ErrSASL got: %+v on %+v", err, c)
- }
- }
- func TestOpenFailedCredentials(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.expectAMQP()
- srv.connectionStart()
- // Now kill/timeout the connection indicating bad auth
- rwc.Close()
- }()
- c, err := Open(rwc, defaultConfig())
- if err != ErrCredentials {
- t.Fatalf("expected ErrCredentials got: %+v on %+v", err, c)
- }
- }
- func TestOpenFailedVhost(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.expectAMQP()
- srv.connectionStart()
- srv.connectionTune()
- srv.recv(0, &connectionOpen{})
- // Now kill/timeout the connection on bad Vhost
- rwc.Close()
- }()
- c, err := Open(rwc, defaultConfig())
- if err != ErrVhost {
- t.Fatalf("expected ErrVhost got: %+v on %+v", err, c)
- }
- }
- func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
- rwc, srv := newSession(t)
- defer rwc.Close()
- go func() {
- srv.connectionOpen()
- srv.channelOpen(1)
- srv.recv(1, &confirmSelect{})
- srv.send(1, &confirmSelectOk{})
- srv.recv(1, &basicPublish{})
- srv.recv(1, &basicPublish{})
- srv.recv(1, &basicPublish{})
- srv.recv(1, &basicPublish{})
- // Single tag, plus multiple, should produce
- // 2, 1, 3, 4
- srv.send(1, &basicAck{DeliveryTag: 2})
- srv.send(1, &basicAck{DeliveryTag: 4, Multiple: true})
- srv.recv(1, &basicPublish{})
- srv.recv(1, &basicPublish{})
- srv.recv(1, &basicPublish{})
- srv.recv(1, &basicPublish{})
- // And some more, but in reverse order, multiple then one
- // 5, 6, 7, 8
- srv.send(1, &basicAck{DeliveryTag: 6, Multiple: true})
- srv.send(1, &basicAck{DeliveryTag: 8})
- srv.send(1, &basicAck{DeliveryTag: 7})
- }()
- c, err := Open(rwc, defaultConfig())
- if err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("could not open channel: %v (%s)", ch, err)
- }
- acks, _ := ch.NotifyConfirm(make(chan uint64), make(chan uint64))
- ch.Confirm(false)
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 1")})
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 2")})
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 3")})
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 4")})
- for i, tag := range []uint64{2, 1, 3, 4} {
- if ack := <-acks; tag != ack {
- t.Fatalf("failed ack, expected ack#%d to be %d, got %d", i, tag, ack)
- }
- }
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 5")})
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 6")})
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 7")})
- ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 8")})
- for i, tag := range []uint64{5, 6, 8, 7} {
- if ack := <-acks; tag != ack {
- t.Fatalf("failed ack, expected ack#%d to be %d, got %d", i, tag, ack)
- }
- }
- }
- func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.connectionOpen()
- srv.channelOpen(1)
- srv.recv(1, &confirmSelect{})
- srv.send(1, &confirmSelectOk{})
- srv.recv(0, &connectionClose{})
- srv.send(0, &connectionCloseOk{})
- }()
- c, err := Open(rwc, defaultConfig())
- if err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("could not open channel: %v (%s)", ch, err)
- }
- ackAndNack := make(chan uint64)
- ch.NotifyConfirm(ackAndNack, ackAndNack)
- if err := ch.Confirm(false); err != nil {
- t.Fatalf("expected to enter confirm mode: %v", err)
- }
- if err := c.Close(); err != nil {
- t.Fatalf("could not close connection: %v (%s)", c, err)
- }
- }
- func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
- rwc, srv := newSession(t)
- go func() {
- srv.connectionOpen()
- srv.channelOpen(1)
- srv.recv(0, &connectionClose{})
- srv.send(0, &connectionCloseOk{})
- }()
- c, err := Open(rwc, defaultConfig())
- if err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("could not open channel: %v (%s)", ch, err)
- }
- if err := c.Close(); err != nil {
- t.Fatalf("could not close connection: %v (%s)", c, err)
- }
- select {
- case <-c.NotifyClose(make(chan *Error)):
- case <-time.After(time.Millisecond):
- t.Errorf("expected to close NotifyClose chan after Connection.Close")
- }
- select {
- case <-ch.NotifyClose(make(chan *Error)):
- case <-time.After(time.Millisecond):
- t.Errorf("expected to close Connection.NotifyClose chan after Connection.Close")
- }
- select {
- case <-ch.NotifyFlow(make(chan bool)):
- case <-time.After(time.Millisecond):
- t.Errorf("expected to close Channel.NotifyFlow chan after Connection.Close")
- }
- select {
- case <-ch.NotifyCancel(make(chan string)):
- case <-time.After(time.Millisecond):
- t.Errorf("expected to close Channel.NofityCancel chan after Connection.Close")
- }
- select {
- case <-ch.NotifyReturn(make(chan Return)):
- case <-time.After(time.Millisecond):
- t.Errorf("expected to close Channel.NotifyReturn chan after Connection.Close")
- }
- ack, nack := ch.NotifyConfirm(make(chan uint64), make(chan uint64))
- select {
- case <-ack:
- case <-time.After(time.Millisecond):
- t.Errorf("expected to close acks on Channel.NotifyConfirm chan after Connection.Close")
- }
- select {
- case <-nack:
- case <-time.After(time.Millisecond):
- t.Errorf("expected to close nacks Channel.NotifyConfirm chan after Connection.Close")
- }
- }
- // Should not panic when sending bodies split at differnet boundaries
- func TestPublishBodySliceIssue74(t *testing.T) {
- rwc, srv := newSession(t)
- defer rwc.Close()
- const frameSize = 100
- const publishings = frameSize * 3
- done := make(chan bool)
- base := make([]byte, publishings)
- go func() {
- srv.connectionOpen()
- srv.channelOpen(1)
- for i := 0; i < publishings; i++ {
- srv.recv(1, &basicPublish{})
- }
- done <- true
- }()
- cfg := defaultConfig()
- cfg.FrameSize = frameSize
- c, err := Open(rwc, cfg)
- if err != nil {
- t.Fatalf("could not create connection: %v (%s)", c, err)
- }
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("could not open channel: %v (%s)", ch, err)
- }
- for i := 0; i < publishings; i++ {
- go ch.Publish("", "q", false, false, Publishing{Body: base[0:i]})
- }
- <-done
- }
- func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
- rwc, srv := newSession(t)
- defer rwc.Close()
- go func() {
- srv.connectionOpen()
- srv.channelOpen(1)
- srv.recv(1, &basicPublish{})
- // Mimic a broken io pipe so that Publish catches the error and goes into shutdown
- srv.S.Close()
- }()
- c, err := Open(rwc, defaultConfig())
- if err != nil {
- t.Fatalf("couldn't create connection: %v (%s)", c, err)
- }
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("couldn't open channel: %v (%s)", ch, err)
- }
- defer time.AfterFunc(500*time.Millisecond, func() { panic("Publish deadlock") }).Stop()
- for {
- if err := ch.Publish("exchange", "q", false, false, Publishing{Body: []byte("test")}); err != nil {
- t.Log("successfully caught disconnect error", err)
- return
- }
- }
- }
|