| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772 |
- // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Source code and contact info at http://github.com/streadway/amqp
- // +build integration
- package amqp
- import (
- "bytes"
- devrand "crypto/rand"
- "encoding/binary"
- "fmt"
- "hash/crc32"
- "io"
- "math/rand"
- "net"
- "os"
- "reflect"
- "strconv"
- "testing"
- "testing/quick"
- "time"
- )
- func TestIntegrationOpenClose(t *testing.T) {
- if c := integrationConnection(t, "open-close"); c != nil {
- t.Logf("have connection, calling connection close")
- if err := c.Close(); err != nil {
- t.Fatalf("connection close: %s", err)
- }
- t.Logf("connection close OK")
- }
- }
- func TestIntegrationOpenCloseChannel(t *testing.T) {
- if c := integrationConnection(t, "channel"); c != nil {
- defer c.Close()
- if _, err := c.Channel(); err != nil {
- t.Errorf("Channel could not be opened: %s", err)
- }
- }
- }
- func TestIntegrationOpenConfig(t *testing.T) {
- config := Config{}
- c, err := DialConfig(integrationURLFromEnv(), config)
- if err != nil {
- t.Errorf("expected to dial with config %+v integration server: %s", config, err)
- }
- if _, err := c.Channel(); err != nil {
- t.Fatalf("expected to open channel: %s", err)
- }
- if err := c.Close(); err != nil {
- t.Fatalf("connection close: %s", err)
- }
- }
- func TestIntegrationOpenConfigWithNetDial(t *testing.T) {
- config := Config{Dial: net.Dial}
- c, err := DialConfig(integrationURLFromEnv(), config)
- if err != nil {
- t.Errorf("expected to dial with config %+v integration server: %s", config, err)
- }
- if _, err := c.Channel(); err != nil {
- t.Fatalf("expected to open channel: %s", err)
- }
- if err := c.Close(); err != nil {
- t.Fatalf("connection close: %s", err)
- }
- }
- func TestIntegrationLocalAddr(t *testing.T) {
- config := Config{}
- c, err := DialConfig(integrationURLFromEnv(), config)
- defer c.Close()
- if err != nil {
- t.Errorf("expected to dial with config %+v integration server: %s", config, err)
- }
- a := c.LocalAddr()
- _, portString, err := net.SplitHostPort(a.String())
- if err != nil {
- t.Errorf("expected to get a local network address with config %+v integration server: %s", config, a.String())
- }
- port, err := strconv.Atoi(portString)
- if err != nil {
- t.Errorf("expected to get a TCP port number with config %+v integration server: %s", config, err)
- }
- t.Logf("Connected to port %d\n", port)
- }
- // https://github.com/streadway/amqp/issues/94
- func TestExchangePassiveOnMissingExchangeShouldError(t *testing.T) {
- c := integrationConnection(t, "exch")
- if c != nil {
- defer c.Close()
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("create channel 1: %s", err)
- }
- defer ch.Close()
- if err := ch.ExchangeDeclarePassive(
- "test-integration-missing-passive-exchange",
- "direct", // type
- false, // duration (note: is durable)
- true, // auto-delete
- false, // internal
- false, // nowait
- nil, // args
- ); err == nil {
- t.Fatal("ExchangeDeclarePassive of a missing exchange should return error")
- }
- }
- }
- // https://github.com/streadway/amqp/issues/94
- func TestIntegrationExchangeDeclarePassiveOnDeclaredShouldNotError(t *testing.T) {
- c := integrationConnection(t, "exch")
- if c != nil {
- defer c.Close()
- exchange := "test-integration-decalred-passive-exchange"
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("create channel: %s", err)
- }
- defer ch.Close()
- if err := ch.ExchangeDeclare(
- exchange, // name
- "direct", // type
- false, // durable
- true, // auto-delete
- false, // internal
- false, // nowait
- nil, // args
- ); err != nil {
- t.Fatalf("declare exchange: %s", err)
- }
- if err := ch.ExchangeDeclarePassive(
- exchange, // name
- "direct", // type
- false, // durable
- true, // auto-delete
- false, // internal
- false, // nowait
- nil, // args
- ); err != nil {
- t.Fatalf("ExchangeDeclarePassive on a declared exchange should not error, got: %q", err)
- }
- }
- }
- func TestIntegrationExchange(t *testing.T) {
- c := integrationConnection(t, "exch")
- if c != nil {
- defer c.Close()
- channel, err := c.Channel()
- if err != nil {
- t.Fatalf("create channel: %s", err)
- }
- t.Logf("create channel OK")
- exchange := "test-integration-exchange"
- if err := channel.ExchangeDeclare(
- exchange, // name
- "direct", // type
- false, // duration
- true, // auto-delete
- false, // internal
- false, // nowait
- nil, // args
- ); err != nil {
- t.Fatalf("declare exchange: %s", err)
- }
- t.Logf("declare exchange OK")
- if err := channel.ExchangeDelete(exchange, false, false); err != nil {
- t.Fatalf("delete exchange: %s", err)
- }
- t.Logf("delete exchange OK")
- if err := channel.Close(); err != nil {
- t.Fatalf("close channel: %s", err)
- }
- t.Logf("close channel OK")
- }
- }
- // https://github.com/streadway/amqp/issues/94
- func TestIntegrationQueueDeclarePassiveOnMissingExchangeShouldError(t *testing.T) {
- c := integrationConnection(t, "queue")
- if c != nil {
- defer c.Close()
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("create channel1: %s", err)
- }
- defer ch.Close()
- if _, err := ch.QueueDeclarePassive(
- "test-integration-missing-passive-queue", // name
- false, // duration (note: not durable)
- true, // auto-delete
- false, // exclusive
- false, // noWait
- nil, // arguments
- ); err == nil {
- t.Fatal("QueueDeclarePassive of a missing queue should error")
- }
- }
- }
- // https://github.com/streadway/amqp/issues/94
- func TestIntegrationPassiveQueue(t *testing.T) {
- c := integrationConnection(t, "queue")
- if c != nil {
- defer c.Close()
- name := "test-integration-declared-passive-queue"
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("create channel1: %s", err)
- }
- defer ch.Close()
- if _, err := ch.QueueDeclare(
- name, // name
- false, // durable
- true, // auto-delete
- false, // exclusive
- false, // noWait
- nil, // arguments
- ); err != nil {
- t.Fatalf("queue declare: %s", err)
- }
- if _, err := ch.QueueDeclarePassive(
- name, // name
- false, // durable
- true, // auto-delete
- false, // exclusive
- false, // noWait
- nil, // arguments
- ); err != nil {
- t.Fatalf("QueueDeclarePassive on declared queue should not error, got: %q", err)
- }
- if _, err := ch.QueueDeclarePassive(
- name, // name
- true, // durable (note: differs)
- true, // auto-delete
- false, // exclusive
- false, // noWait
- nil, // arguments
- ); err != nil {
- t.Fatalf("QueueDeclarePassive on declared queue with different flags should error")
- }
- }
- }
- func TestIntegrationBasicQueueOperations(t *testing.T) {
- c := integrationConnection(t, "queue")
- if c != nil {
- defer c.Close()
- channel, err := c.Channel()
- if err != nil {
- t.Fatalf("create channel: %s", err)
- }
- t.Logf("create channel OK")
- exchangeName := "test-basic-ops-exchange"
- queueName := "test-basic-ops-queue"
- deleteQueueFirstOptions := []bool{true, false}
- for _, deleteQueueFirst := range deleteQueueFirstOptions {
- if err := channel.ExchangeDeclare(
- exchangeName, // name
- "direct", // type
- true, // duration (note: is durable)
- false, // auto-delete
- false, // internal
- false, // nowait
- nil, // args
- ); err != nil {
- t.Fatalf("declare exchange: %s", err)
- }
- t.Logf("declare exchange OK")
- if _, err := channel.QueueDeclare(
- queueName, // name
- true, // duration (note: durable)
- false, // auto-delete
- false, // exclusive
- false, // noWait
- nil, // arguments
- ); err != nil {
- t.Fatalf("queue declare: %s", err)
- }
- t.Logf("declare queue OK")
- if err := channel.QueueBind(
- queueName, // name
- "", // routingKey
- exchangeName, // sourceExchange
- false, // noWait
- nil, // arguments
- ); err != nil {
- t.Fatalf("queue bind: %s", err)
- }
- t.Logf("queue bind OK")
- if deleteQueueFirst {
- if _, err := channel.QueueDelete(
- queueName, // name
- false, // ifUnused (false=be aggressive)
- false, // ifEmpty (false=be aggressive)
- false, // noWait
- ); err != nil {
- t.Fatalf("delete queue (first): %s", err)
- }
- t.Logf("delete queue (first) OK")
- if err := channel.ExchangeDelete(exchangeName, false, false); err != nil {
- t.Fatalf("delete exchange (after delete queue): %s", err)
- }
- t.Logf("delete exchange (after delete queue) OK")
- } else { // deleteExchangeFirst
- if err := channel.ExchangeDelete(exchangeName, false, false); err != nil {
- t.Fatalf("delete exchange (first): %s", err)
- }
- t.Logf("delete exchange (first) OK")
- if _, err := channel.QueueInspect(queueName); err != nil {
- t.Fatalf("inspect queue state after deleting exchange: %s", err)
- }
- t.Logf("queue properly remains after exchange is deleted")
- if _, err := channel.QueueDelete(
- queueName,
- false, // ifUnused
- false, // ifEmpty
- false, // noWait
- ); err != nil {
- t.Fatalf("delete queue (after delete exchange): %s", err)
- }
- t.Logf("delete queue (after delete exchange) OK")
- }
- }
- if err := channel.Close(); err != nil {
- t.Fatalf("close channel: %s", err)
- }
- t.Logf("close channel OK")
- }
- }
- func TestIntegrationConnectionNegotiatesMaxChannels(t *testing.T) {
- config := Config{ChannelMax: 0}
- c, err := DialConfig(integrationURLFromEnv(), config)
- if err != nil {
- t.Errorf("expected to dial with config %+v integration server: %s", config, err)
- }
- defer c.Close()
- if want, got := defaultChannelMax, c.Config.ChannelMax; want != got {
- t.Fatalf("expected connection to negotiate uint16 (%d) channels, got: %d", want, got)
- }
- }
- func TestIntegrationConnectionNegotiatesClientMaxChannels(t *testing.T) {
- config := Config{ChannelMax: 16}
- c, err := DialConfig(integrationURLFromEnv(), config)
- if err != nil {
- t.Errorf("expected to dial with config %+v integration server: %s", config, err)
- }
- defer c.Close()
- if want, got := config.ChannelMax, c.Config.ChannelMax; want != got {
- t.Fatalf("expected client specified channel limit after handshake %d, got: %d", want, got)
- }
- }
- func TestIntegrationChannelIDsExhausted(t *testing.T) {
- config := Config{ChannelMax: 16}
- c, err := DialConfig(integrationURLFromEnv(), config)
- if err != nil {
- t.Errorf("expected to dial with config %+v integration server: %s", config, err)
- }
- defer c.Close()
- for i := 1; i <= c.Config.ChannelMax; i++ {
- if _, err := c.Channel(); err != nil {
- t.Fatalf("expected allocating all channel ids to succed, failed on %d with %v", i, err)
- }
- }
- if _, err := c.Channel(); err != ErrChannelMax {
- t.Fatalf("expected allocating all channels to produce the client side error %#v, got: %#v", ErrChannelMax, err)
- }
- }
- func TestIntegrationChannelClosing(t *testing.T) {
- c := integrationConnection(t, "closings")
- if c != nil {
- defer c.Close()
- // This function is run on every channel after it is successfully
- // opened. It can do something to verify something. It should be
- // quick; many channels may be opened!
- f := func(t *testing.T, c *Channel) {
- return
- }
- // open and close
- channel, err := c.Channel()
- if err != nil {
- t.Fatalf("basic create channel: %s", err)
- }
- t.Logf("basic create channel OK")
- if err := channel.Close(); err != nil {
- t.Fatalf("basic close channel: %s", err)
- }
- t.Logf("basic close channel OK")
- // deferred close
- signal := make(chan bool)
- go func() {
- channel, err := c.Channel()
- if err != nil {
- t.Fatalf("second create channel: %s", err)
- }
- t.Logf("second create channel OK")
- <-signal // a bit of synchronization
- f(t, channel)
- defer func() {
- if err := channel.Close(); err != nil {
- t.Fatalf("deferred close channel: %s", err)
- }
- t.Logf("deferred close channel OK")
- signal <- true
- }()
- }()
- signal <- true
- select {
- case <-signal:
- t.Logf("(got close signal OK)")
- break
- case <-time.After(250 * time.Millisecond):
- t.Fatalf("deferred close: timeout")
- }
- // multiple channels
- for _, n := range []int{2, 4, 8, 16, 32, 64, 128, 256} {
- channels := make([]*Channel, n)
- for i := 0; i < n; i++ {
- var err error
- if channels[i], err = c.Channel(); err != nil {
- t.Fatalf("create channel %d/%d: %s", i+1, n, err)
- }
- }
- f(t, channel)
- for i, channel := range channels {
- if err := channel.Close(); err != nil {
- t.Fatalf("close channel %d/%d: %s", i+1, n, err)
- }
- }
- t.Logf("created/closed %d channels OK", n)
- }
- }
- }
- func TestIntegrationMeaningfulChannelErrors(t *testing.T) {
- c := integrationConnection(t, "pub")
- if c != nil {
- defer c.Close()
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("Could not create channel")
- }
- queue := "test.integration.channel.error"
- _, err = ch.QueueDeclare(queue, false, true, false, false, nil)
- if err != nil {
- t.Fatalf("Could not declare")
- }
- _, err = ch.QueueDeclare(queue, true, false, false, false, nil)
- if err == nil {
- t.Fatalf("Expected error, got nil")
- }
- e, ok := err.(*Error)
- if !ok {
- t.Fatalf("Expected type Error response, got %T", err)
- }
- if e.Code != PreconditionFailed {
- t.Fatalf("Expected PreconditionFailed, got: %+v", e)
- }
- _, err = ch.QueueDeclare(queue, false, true, false, false, nil)
- if err != ErrClosed {
- t.Fatalf("Expected channel to be closed, got: %T", err)
- }
- }
- }
- // https://github.com/streadway/amqp/issues/6
- func TestIntegrationNonBlockingClose(t *testing.T) {
- c := integrationConnection(t, "#6")
- if c != nil {
- defer c.Close()
- ch, err := c.Channel()
- if err != nil {
- t.Fatalf("Could not create channel")
- }
- queue := "test.integration.blocking.close"
- _, err = ch.QueueDeclare(queue, false, true, false, false, nil)
- if err != nil {
- t.Fatalf("Could not declare")
- }
- msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Fatalf("Could not consume")
- }
- // Simulate a consumer
- go func() {
- for _ = range msgs {
- t.Logf("Oh my, received message on an empty queue")
- }
- }()
- succeed := make(chan bool)
- go func() {
- if err = ch.Close(); err != nil {
- t.Fatalf("Close produced an error when it shouldn't")
- }
- succeed <- true
- }()
- select {
- case <-succeed:
- break
- case <-time.After(1 * time.Second):
- t.Fatalf("Close timed out after 1s")
- }
- }
- }
- func TestIntegrationPublishConsume(t *testing.T) {
- queue := "test.integration.publish.consume"
- c1 := integrationConnection(t, "pub")
- c2 := integrationConnection(t, "sub")
- if c1 != nil && c2 != nil {
- defer c1.Close()
- defer c2.Close()
- pub, _ := c1.Channel()
- sub, _ := c2.Channel()
- pub.QueueDeclare(queue, false, true, false, false, nil)
- sub.QueueDeclare(queue, false, true, false, false, nil)
- defer pub.QueueDelete(queue, false, false, false)
- messages, _ := sub.Consume(queue, "", false, false, false, false, nil)
- pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")})
- pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")})
- pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 3")})
- assertConsumeBody(t, messages, []byte("pub 1"))
- assertConsumeBody(t, messages, []byte("pub 2"))
- assertConsumeBody(t, messages, []byte("pub 3"))
- }
- }
- func TestIntegrationConsumeFlow(t *testing.T) {
- queue := "test.integration.consumer-flow"
- c1 := integrationConnection(t, "pub-flow")
- c2 := integrationConnection(t, "sub-flow")
- if c1 != nil && c2 != nil {
- defer c1.Close()
- defer c2.Close()
- pub, _ := c1.Channel()
- sub, _ := c2.Channel()
- pub.QueueDeclare(queue, false, true, false, false, nil)
- sub.QueueDeclare(queue, false, true, false, false, nil)
- defer pub.QueueDelete(queue, false, false, false)
- sub.Qos(1, 0, false)
- messages, _ := sub.Consume(queue, "", false, false, false, false, nil)
- pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 1")})
- pub.Publish("", queue, false, false, Publishing{Body: []byte("pub 2")})
- msg := assertConsumeBody(t, messages, []byte("pub 1"))
- if err := sub.Flow(false); err.(*Error).Code == NotImplemented {
- t.Log("flow control is not supported on this version of rabbitmq")
- return
- }
- msg.Ack(false)
- select {
- case <-messages:
- t.Fatalf("message was delivered when flow was not active")
- default:
- }
- sub.Flow(true)
- msg = assertConsumeBody(t, messages, []byte("pub 2"))
- msg.Ack(false)
- }
- }
- func TestIntegrationRecoverNotImplemented(t *testing.T) {
- queue := "test.recover"
- if c, ch := integrationQueue(t, queue); c != nil {
- if product, ok := c.Properties["product"]; ok && product.(string) == "RabbitMQ" {
- defer c.Close()
- err := ch.Recover(false)
- if ex, ok := err.(*Error); !ok || ex.Code != 540 {
- t.Fatalf("Expected NOT IMPLEMENTED got: %v", ex)
- }
- }
- }
- }
- // This test is driven by a private API to simulate the server sending a channelFlow message
- func TestIntegrationPublishFlow(t *testing.T) {
- // TODO - no idea how to test without affecting the server or mucking internal APIs
- // i'd like to make sure the RW lock can be held by multiple publisher threads
- // and that multiple channelFlow messages do not block the dispatch thread
- }
- func TestIntegrationConsumeCancel(t *testing.T) {
- queue := "test.integration.consume-cancel"
- c := integrationConnection(t, "pub")
- if c != nil {
- defer c.Close()
- ch, _ := c.Channel()
- ch.QueueDeclare(queue, false, true, false, false, nil)
- defer ch.QueueDelete(queue, false, false, false)
- messages, _ := ch.Consume(queue, "integration-tag", false, false, false, false, nil)
- ch.Publish("", queue, false, false, Publishing{Body: []byte("1")})
- assertConsumeBody(t, messages, []byte("1"))
- err := ch.Cancel("integration-tag", false)
- if err != nil {
- t.Fatalf("error cancelling the consumer: %v", err)
- }
- ch.Publish("", queue, false, false, Publishing{Body: []byte("2")})
- select {
- case <-time.After(100 * time.Millisecond):
- t.Fatalf("Timeout on Close")
- case _, ok := <-messages:
- if ok {
- t.Fatalf("Extra message on consumer when consumer should have been closed")
- }
- }
- }
- }
- func (c *Connection) Generate(r *rand.Rand, _ int) reflect.Value {
- urlStr := os.Getenv("AMQP_URL")
- if urlStr == "" {
- return reflect.ValueOf(nil)
- }
- conn, err := Dial(urlStr)
- if err != nil {
- return reflect.ValueOf(nil)
- }
- return reflect.ValueOf(conn)
- }
- func (c Publishing) Generate(r *rand.Rand, _ int) reflect.Value {
- var ok bool
- var t reflect.Value
- p := Publishing{}
- //p.DeliveryMode = uint8(r.Intn(3))
- //p.Priority = uint8(r.Intn(8))
- if r.Intn(2) > 0 {
- p.ContentType = "application/octet-stream"
- }
- if r.Intn(2) > 0 {
- p.ContentEncoding = "gzip"
- }
- if r.Intn(2) > 0 {
- p.CorrelationId = fmt.Sprintf("%d", r.Int())
- }
- if r.Intn(2) > 0 {
- p.ReplyTo = fmt.Sprintf("%d", r.Int())
- }
- if r.Intn(2) > 0 {
- p.MessageId = fmt.Sprintf("%d", r.Int())
- }
- if r.Intn(2) > 0 {
- p.Type = fmt.Sprintf("%d", r.Int())
- }
- if r.Intn(2) > 0 {
- p.AppId = fmt.Sprintf("%d", r.Int())
- }
- if r.Intn(2) > 0 {
- p.Timestamp = time.Unix(r.Int63(), r.Int63())
- }
- if t, ok = quick.Value(reflect.TypeOf(p.Body), r); ok {
- p.Body = t.Bytes()
- }
- return reflect.ValueOf(p)
- }
- func TestQuickPublishOnly(t *testing.T) {
- if c := integrationConnection(t, "quick"); c != nil {
- defer c.Close()
- pub, err := c.Channel()
- queue := "test-publish"
- if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Errorf("Failed to declare: %s", err)
- return
- }
- defer pub.QueueDelete(queue, false, false, false)
- quick.Check(func(msg Publishing) bool {
- return pub.Publish("", queue, false, false, msg) == nil
- }, nil)
- }
- }
- func TestPublishEmptyBody(t *testing.T) {
- c := integrationConnection(t, "empty")
- if c != nil {
- defer c.Close()
- ch, err := c.Channel()
- if err != nil {
- t.Errorf("Failed to create channel")
- return
- }
- queue := "test-TestPublishEmptyBody"
- if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Fatalf("Could not declare")
- }
- messages, err := ch.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Fatalf("Could not consume")
- }
- err = ch.Publish("", queue, false, false, Publishing{})
- if err != nil {
- t.Fatalf("Could not publish")
- }
- select {
- case msg := <-messages:
- if len(msg.Body) != 0 {
- t.Errorf("Received non empty body")
- }
- case <-time.After(200 * time.Millisecond):
- t.Errorf("Timeout on receive")
- }
- }
- }
- func TestPublishEmptyBodyWithHeadersIssue67(t *testing.T) {
- c := integrationConnection(t, "issue67")
- if c != nil {
- defer c.Close()
- ch, err := c.Channel()
- if err != nil {
- t.Errorf("Failed to create channel")
- return
- }
- queue := "test-TestPublishEmptyBodyWithHeaders"
- if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Fatalf("Could not declare")
- }
- messages, err := ch.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Fatalf("Could not consume")
- }
- headers := Table{
- "ham": "spam",
- }
- err = ch.Publish("", queue, false, false, Publishing{Headers: headers})
- if err != nil {
- t.Fatalf("Could not publish")
- }
- select {
- case msg := <-messages:
- if msg.Headers["ham"] == nil {
- t.Fatalf("Headers aren't sent")
- }
- if msg.Headers["ham"] != "spam" {
- t.Fatalf("Headers are wrong")
- }
- case <-time.After(200 * time.Millisecond):
- t.Errorf("Timeout on receive")
- }
- }
- }
- func TestQuickPublishConsumeOnly(t *testing.T) {
- c1 := integrationConnection(t, "quick-pub")
- c2 := integrationConnection(t, "quick-sub")
- if c1 != nil && c2 != nil {
- defer c1.Close()
- defer c2.Close()
- pub, err := c1.Channel()
- sub, err := c2.Channel()
- queue := "TestPublishConsumeOnly"
- if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Errorf("Failed to declare: %s", err)
- return
- }
- if _, err = sub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Errorf("Failed to declare: %s", err)
- return
- }
- defer sub.QueueDelete(queue, false, false, false)
- ch, err := sub.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Errorf("Could not sub: %s", err)
- }
- quick.CheckEqual(
- func(msg Publishing) []byte {
- empty := Publishing{Body: msg.Body}
- if pub.Publish("", queue, false, false, empty) != nil {
- return []byte{'X'}
- }
- return msg.Body
- },
- func(msg Publishing) []byte {
- out := <-ch
- out.Ack(false)
- return out.Body
- },
- nil)
- }
- }
- func TestQuickPublishConsumeBigBody(t *testing.T) {
- c1 := integrationConnection(t, "big-pub")
- c2 := integrationConnection(t, "big-sub")
- if c1 != nil && c2 != nil {
- defer c1.Close()
- defer c2.Close()
- pub, err := c1.Channel()
- sub, err := c2.Channel()
- queue := "test-pubsub"
- if _, err = sub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Errorf("Failed to declare: %s", err)
- return
- }
- ch, err := sub.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Errorf("Could not sub: %s", err)
- }
- fixture := Publishing{
- Body: make([]byte, 1e4+1000),
- }
- if _, err = pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Errorf("Failed to declare: %s", err)
- return
- }
- err = pub.Publish("", queue, false, false, fixture)
- if err != nil {
- t.Errorf("Could not publish big body")
- }
- select {
- case msg := <-ch:
- if bytes.Compare(msg.Body, fixture.Body) != 0 {
- t.Errorf("Consumed big body didn't match")
- }
- case <-time.After(200 * time.Millisecond):
- t.Errorf("Timeout on receive")
- }
- }
- }
- func TestIntegrationGetOk(t *testing.T) {
- if c := integrationConnection(t, "getok"); c != nil {
- defer c.Close()
- queue := "test.get-ok"
- ch, _ := c.Channel()
- ch.QueueDeclare(queue, false, true, false, false, nil)
- ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
- msg, ok, err := ch.Get(queue, false)
- if err != nil {
- t.Fatalf("Failed get: %v", err)
- }
- if !ok {
- t.Fatalf("Get on a queued message did not find the message")
- }
- if string(msg.Body) != "ok" {
- t.Fatalf("Get did not get the correct message")
- }
- }
- }
- func TestIntegrationGetEmpty(t *testing.T) {
- if c := integrationConnection(t, "getok"); c != nil {
- defer c.Close()
- queue := "test.get-ok"
- ch, _ := c.Channel()
- ch.QueueDeclare(queue, false, true, false, false, nil)
- _, ok, err := ch.Get(queue, false)
- if err != nil {
- t.Fatalf("Failed get: %v", err)
- }
- if !ok {
- t.Fatalf("Get on a queued message retrieved a message when it shouldn't have")
- }
- }
- }
- func TestIntegrationTxCommit(t *testing.T) {
- if c := integrationConnection(t, "txcommit"); c != nil {
- defer c.Close()
- queue := "test.tx.commit"
- ch, _ := c.Channel()
- ch.QueueDeclare(queue, false, true, false, false, nil)
- if err := ch.Tx(); err != nil {
- t.Fatalf("tx.select failed")
- }
- ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
- if err := ch.TxCommit(); err != nil {
- t.Fatalf("tx.commit failed")
- }
- msg, ok, err := ch.Get(queue, false)
- if err != nil || !ok {
- t.Fatalf("Failed get: %v", err)
- }
- if string(msg.Body) != "ok" {
- t.Fatalf("Get did not get the correct message from the transaction")
- }
- }
- }
- func TestIntegrationTxRollback(t *testing.T) {
- if c := integrationConnection(t, "txrollback"); c != nil {
- defer c.Close()
- queue := "test.tx.rollback"
- ch, _ := c.Channel()
- ch.QueueDeclare(queue, false, true, false, false, nil)
- if err := ch.Tx(); err != nil {
- t.Fatalf("tx.select failed")
- }
- ch.Publish("", queue, false, false, Publishing{Body: []byte("ok")})
- if err := ch.TxRollback(); err != nil {
- t.Fatalf("tx.rollback failed")
- }
- _, ok, err := ch.Get(queue, false)
- if err != nil {
- t.Fatalf("Failed get: %v", err)
- }
- if ok {
- t.Fatalf("message was published when it should have been rolled back")
- }
- }
- }
- func TestIntegrationReturn(t *testing.T) {
- if c, ch := integrationQueue(t, "return"); c != nil {
- defer c.Close()
- ret := make(chan Return, 1)
- ch.NotifyReturn(ret)
- // mandatory publish to an exchange without a binding should be returned
- ch.Publish("", "return-without-binding", true, false, Publishing{Body: []byte("mandatory")})
- select {
- case res := <-ret:
- if string(res.Body) != "mandatory" {
- t.Fatalf("expected return of the same message")
- }
- if res.ReplyCode != NoRoute {
- t.Fatalf("expected no consumers reply code on the Return result, got: %v", res.ReplyCode)
- }
- case <-time.After(200 * time.Millisecond):
- t.Fatalf("no return was received within 200ms")
- }
- }
- }
- func TestIntegrationCancel(t *testing.T) {
- queue := "cancel"
- consumerTag := "test.cancel"
- if c, ch := integrationQueue(t, queue); c != nil {
- defer c.Close()
- cancels := ch.NotifyCancel(make(chan string, 1))
- go func() {
- if _, err := ch.Consume(queue, consumerTag, false, false, false, false, nil); err != nil {
- t.Fatalf("cannot consume from %q to test NotifyCancel: %v", queue, err)
- }
- if _, err := ch.QueueDelete(queue, false, false, false); err != nil {
- t.Fatalf("cannot delete integration queue: %v", err)
- }
- }()
- select {
- case tag := <-cancels:
- if want, got := consumerTag, tag; want != got {
- t.Fatalf("expected to be notified of deleted queue with consumer tag, got: %q", got)
- }
- case <-time.After(200 * time.Millisecond):
- t.Fatalf("expected to be notified of deleted queue with 200ms")
- }
- }
- }
- func TestIntegrationConfirm(t *testing.T) {
- if c, ch := integrationQueue(t, "confirm"); c != nil {
- defer c.Close()
- ack, nack := make(chan uint64, 1), make(chan uint64, 1)
- ch.NotifyConfirm(ack, nack)
- if err := ch.Confirm(false); err != nil {
- t.Fatalf("could not confirm")
- }
- ch.Publish("", "confirm", false, false, Publishing{Body: []byte("confirm")})
- select {
- case tag := <-ack:
- if tag != 1 {
- t.Fatalf("expected ack starting with delivery tag of 1")
- }
- case <-time.After(200 * time.Millisecond):
- t.Fatalf("no ack was received within 200ms")
- }
- }
- }
- // https://github.com/streadway/amqp/issues/61
- func TestRoundTripAllFieldValueTypes61(t *testing.T) {
- if conn := integrationConnection(t, "issue61"); conn != nil {
- defer conn.Close()
- timestamp := time.Unix(100000000, 0)
- headers := Table{
- "A": []interface{}{
- []interface{}{"nested array", int32(3)},
- Decimal{2, 1},
- Table{"S": "nested table in array"},
- int32(2 << 20),
- string("array string"),
- timestamp,
- nil,
- byte(2),
- float64(2.64),
- float32(2.32),
- int64(2 << 60),
- int16(2 << 10),
- bool(true),
- []byte{'b', '2'},
- },
- "D": Decimal{1, 1},
- "F": Table{"S": "nested table in table"},
- "I": int32(1 << 20),
- "S": string("string"),
- "T": timestamp,
- "V": nil,
- "b": byte(1),
- "d": float64(1.64),
- "f": float32(1.32),
- "l": int64(1 << 60),
- "s": int16(1 << 10),
- "t": bool(true),
- "x": []byte{'b', '1'},
- }
- queue := "test.issue61-roundtrip"
- ch, _ := conn.Channel()
- if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Fatalf("Could not declare")
- }
- msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Fatalf("Could not consume")
- }
- err = ch.Publish("", queue, false, false, Publishing{Body: []byte("ignored"), Headers: headers})
- if err != nil {
- t.Fatalf("Could not publish: %v", err)
- }
- msg, ok := <-msgs
- if !ok {
- t.Fatalf("Channel closed prematurely likely due to publish exception")
- }
- for k, v := range headers {
- if !reflect.DeepEqual(v, msg.Headers[k]) {
- t.Errorf("Round trip header not the same for key %q: expected: %#v, got %#v", k, v, msg.Headers[k])
- }
- }
- }
- }
- // Declares a queue with the x-message-ttl extension to exercise integer
- // serialization.
- //
- // Relates to https://github.com/streadway/amqp/issues/60
- //
- func TestDeclareArgsXMessageTTL(t *testing.T) {
- if conn := integrationConnection(t, "declareTTL"); conn != nil {
- defer conn.Close()
- ch, _ := conn.Channel()
- args := Table{"x-message-ttl": int32(9000000)}
- // should not drop the connection
- if _, err := ch.QueueDeclare("declareWithTTL", false, true, false, false, args); err != nil {
- t.Fatalf("cannot declare with TTL: got: %v", err)
- }
- }
- }
- // Sets up the topology where rejected messages will be forwarded
- // to a fanout exchange, with a single queue bound.
- //
- // Relates to https://github.com/streadway/amqp/issues/56
- //
- func TestDeclareArgsRejectToDeadLetterQueue(t *testing.T) {
- if conn := integrationConnection(t, "declareArgs"); conn != nil {
- defer conn.Close()
- ex, q := "declareArgs", "declareArgs-deliveries"
- dlex, dlq := ex+"-dead-letter", q+"-dead-letter"
- ch, _ := conn.Channel()
- if err := ch.ExchangeDeclare(ex, "fanout", false, true, false, false, nil); err != nil {
- t.Fatalf("cannot declare %v: got: %v", ex, err)
- }
- if err := ch.ExchangeDeclare(dlex, "fanout", false, true, false, false, nil); err != nil {
- t.Fatalf("cannot declare %v: got: %v", dlex, err)
- }
- if _, err := ch.QueueDeclare(dlq, false, true, false, false, nil); err != nil {
- t.Fatalf("cannot declare %v: got: %v", dlq, err)
- }
- if err := ch.QueueBind(dlq, "#", dlex, false, nil); err != nil {
- t.Fatalf("cannot bind %v to %v: got: %v", dlq, dlex, err)
- }
- if _, err := ch.QueueDeclare(q, false, true, false, false, Table{
- "x-dead-letter-exchange": dlex,
- }); err != nil {
- t.Fatalf("cannot declare %v with dlq %v: got: %v", q, dlex, err)
- }
- if err := ch.QueueBind(q, "#", ex, false, nil); err != nil {
- t.Fatalf("cannot bind %v: got: %v", ex, err)
- }
- fails, err := ch.Consume(q, "", false, false, false, false, nil)
- if err != nil {
- t.Fatalf("cannot consume %v: got: %v", q, err)
- }
- // Reject everything consumed
- go func() {
- for d := range fails {
- d.Reject(false)
- }
- }()
- // Publish the 'poison'
- if err := ch.Publish(ex, q, true, false, Publishing{Body: []byte("ignored")}); err != nil {
- t.Fatalf("publishing failed")
- }
- // spin-get until message arrives on the dead-letter queue with a
- // synchronous parse to exercise the array field (x-death) set by the
- // server relating to issue-56
- for i := 0; i < 10; i++ {
- d, got, err := ch.Get(dlq, false)
- if !got && err == nil {
- continue
- } else if err != nil {
- t.Fatalf("expected success in parsing reject, got: %v", err)
- } else {
- // pass if we've parsed an array
- if v, ok := d.Headers["x-death"]; ok {
- if _, ok := v.([]interface{}); ok {
- return
- }
- }
- t.Fatalf("array field x-death expected in the headers, got: %v (%T)", d.Headers, d.Headers["x-death"])
- }
- }
- t.Fatalf("expectd dead-letter after 10 get attempts")
- }
- }
- // https://github.com/streadway/amqp/issues/48
- func TestDeadlockConsumerIssue48(t *testing.T) {
- if conn := integrationConnection(t, "issue48"); conn != nil {
- defer conn.Close()
- deadline := make(chan bool)
- go func() {
- select {
- case <-time.After(5 * time.Second):
- panic("expected to receive 2 deliveries while in an RPC, got a deadlock")
- case <-deadline:
- // pass
- }
- }()
- ch, err := conn.Channel()
- if err != nil {
- t.Fatalf("got error on channel.open: %v", err)
- }
- queue := "test-issue48"
- if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Fatalf("expected to declare a queue: %v", err)
- }
- if err := ch.Confirm(false); err != nil {
- t.Fatalf("got error on confirm: %v", err)
- }
- ack, nack := make(chan uint64, 2), make(chan uint64, 2)
- ch.NotifyConfirm(ack, nack)
- for i := 0; i < cap(ack); i++ {
- // Fill the queue with some new or remaining publishings
- ch.Publish("", queue, false, false, Publishing{Body: []byte("")})
- }
- for i := 0; i < cap(ack); i++ {
- // Wait for them to land on the queue so they'll be delivered on consume
- <-ack
- }
- // Consuming should send them all on the wire
- msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Fatalf("got error on consume: %v", err)
- }
- // We pop one off the chan, the other is on the wire
- <-msgs
- // Opening a new channel (any RPC) while another delivery is on the wire
- if _, err := conn.Channel(); err != nil {
- t.Fatalf("got error on consume: %v", err)
- }
- // We pop the next off the chan
- <-msgs
- deadline <- true
- }
- }
- // https://github.com/streadway/amqp/issues/46
- func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) {
- conn := integrationConnection(t, "issue46")
- if conn != nil {
- for i := 0; i < 100; i++ {
- ch, err := conn.Channel()
- if err != nil {
- t.Fatalf("expected error only on publish, got error on channel.open: %v", err)
- }
- for j := 0; j < 10; j++ {
- err = ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
- if err, ok := err.(Error); ok {
- if err.Code != 504 {
- t.Fatalf("expected channel only exception, got: %v", err)
- }
- }
- }
- }
- }
- }
- // https://github.com/streadway/amqp/issues/43
- func TestChannelExceptionWithCloseIssue43(t *testing.T) {
- conn := integrationConnection(t, "issue43")
- if conn != nil {
- go func() {
- for err := range conn.NotifyClose(make(chan *Error)) {
- t.Log(err.Error())
- }
- }()
- c1, err := conn.Channel()
- if err != nil {
- panic(err)
- }
- go func() {
- for err := range c1.NotifyClose(make(chan *Error)) {
- t.Log("Channel1 Close: " + err.Error())
- }
- }()
- c2, err := conn.Channel()
- if err != nil {
- panic(err)
- }
- go func() {
- for err := range c2.NotifyClose(make(chan *Error)) {
- t.Log("Channel2 Close: " + err.Error())
- }
- }()
- // Cause an asynchronous channel exception causing the server
- // to send a "channel.close" method either before or after the next
- // asynchronous method.
- err = c1.Publish("nonexisting-exchange", "", false, false, Publishing{})
- if err != nil {
- panic(err)
- }
- // Receive or send the channel close method, the channel shuts down
- // but this expects a channel.close-ok to be received.
- c1.Close()
- // This ensures that the 2nd channel is unaffected by the channel exception
- // on channel 1.
- err = c2.ExchangeDeclare("test-channel-still-exists", "direct", false, true, false, false, nil)
- if err != nil {
- panic(err)
- }
- }
- }
- // https://github.com/streadway/amqp/issues/7
- func TestCorruptedMessageIssue7(t *testing.T) {
- messageCount := 1024
- c1 := integrationConnection(t, "")
- c2 := integrationConnection(t, "")
- if c1 != nil && c2 != nil {
- defer c1.Close()
- defer c2.Close()
- pub, err := c1.Channel()
- if err != nil {
- t.Fatalf("Cannot create Channel")
- }
- sub, err := c2.Channel()
- if err != nil {
- t.Fatalf("Cannot create Channel")
- }
- queue := "test-corrupted-message-regression"
- if _, err := pub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Fatalf("Cannot declare")
- }
- if _, err := sub.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Fatalf("Cannot declare")
- }
- msgs, err := sub.Consume(queue, "", false, false, false, false, nil)
- if err != nil {
- t.Fatalf("Cannot consume")
- }
- for i := 0; i < messageCount; i++ {
- err := pub.Publish("", queue, false, false, Publishing{
- Body: generateCrc32Random(7 * i),
- })
- if err != nil {
- t.Fatalf("Failed to publish")
- }
- }
- for i := 0; i < messageCount; i++ {
- select {
- case msg := <-msgs:
- assertMessageCrc32(t, msg.Body, fmt.Sprintf("missed match at %d", i))
- case <-time.After(200 * time.Millisecond):
- t.Fatalf("Timeout on recv")
- }
- }
- }
- }
- func TestExchangeDeclarePrecondition(t *testing.T) {
- c1 := integrationConnection(t, "exchange-double-declare")
- c2 := integrationConnection(t, "exchange-double-declare-cleanup")
- if c1 != nil && c2 != nil {
- defer c1.Close()
- defer c2.Close()
- ch, err := c1.Channel()
- if err != nil {
- t.Fatalf("Create channel")
- }
- exchange := "test-mismatched-redeclare"
- err = ch.ExchangeDeclare(
- exchange,
- "direct", // exchangeType
- false, // durable
- true, // auto-delete
- false, // internal
- false, // noWait
- nil, // arguments
- )
- if err != nil {
- t.Fatalf("Could not initially declare exchange")
- }
- err = ch.ExchangeDeclare(
- exchange,
- "direct",
- true, // different durability
- true,
- false,
- false,
- nil,
- )
- if err == nil {
- t.Fatalf("Expected to fail a redeclare with different durability, didn't receive an error")
- }
- if err, ok := err.(Error); ok {
- if err.Code != PreconditionFailed {
- t.Fatalf("Expected precondition error")
- }
- if !err.Recover {
- t.Fatalf("Expected to be able to recover")
- }
- }
- ch2, _ := c2.Channel()
- if err = ch2.ExchangeDelete(exchange, false, false); err != nil {
- t.Fatalf("Could not delete exchange: %v", err)
- }
- }
- }
- func TestRabbitMQQueueTTLGet(t *testing.T) {
- if c := integrationRabbitMQ(t, "ttl"); c != nil {
- defer c.Close()
- queue := "test.rabbitmq-message-ttl"
- channel, err := c.Channel()
- if err != nil {
- t.Fatalf("channel: %v", err)
- }
- if _, err = channel.QueueDeclare(
- queue,
- false,
- true,
- false,
- false,
- Table{"x-message-ttl": int32(100)}, // in ms
- ); err != nil {
- t.Fatalf("queue declare: %s", err)
- }
- channel.Publish("", queue, false, false, Publishing{Body: []byte("ttl")})
- time.Sleep(200 * time.Millisecond)
- _, ok, err := channel.Get(queue, false)
- if ok {
- t.Fatalf("Expected the message to expire in 100ms, it didn't expire after 200ms")
- }
- if err != nil {
- t.Fatalf("Failed to get on ttl queue")
- }
- }
- }
- func TestRabbitMQQueueNackMultipleRequeue(t *testing.T) {
- if c := integrationRabbitMQ(t, "nack"); c != nil {
- defer c.Close()
- if c.isCapable("basic.nack") {
- queue := "test.rabbitmq-basic-nack"
- channel, err := c.Channel()
- if err != nil {
- t.Fatalf("channel: %v", err)
- }
- if _, err = channel.QueueDeclare(queue, false, true, false, false, nil); err != nil {
- t.Fatalf("queue declare: %s", err)
- }
- channel.Publish("", queue, false, false, Publishing{Body: []byte("1")})
- channel.Publish("", queue, false, false, Publishing{Body: []byte("2")})
- m1, ok, err := channel.Get(queue, false)
- if !ok || err != nil || m1.Body[0] != '1' {
- t.Fatalf("could not get message %v", m1)
- }
- m2, ok, err := channel.Get(queue, false)
- if !ok || err != nil || m2.Body[0] != '2' {
- t.Fatalf("could not get message %v", m2)
- }
- m2.Nack(true, true)
- m1, ok, err = channel.Get(queue, false)
- if !ok || err != nil || m1.Body[0] != '1' {
- t.Fatalf("could not get message %v", m1)
- }
- m2, ok, err = channel.Get(queue, false)
- if !ok || err != nil || m2.Body[0] != '2' {
- t.Fatalf("could not get message %v", m2)
- }
- }
- }
- }
- /*
- * Support for integration tests
- */
- func integrationURLFromEnv() string {
- url := os.Getenv("AMQP_URL")
- if url == "" {
- url = "amqp://"
- }
- return url
- }
- func loggedConnection(t *testing.T, conn *Connection, name string) *Connection {
- if name != "" {
- conn.conn = &logIO{t, name, conn.conn}
- }
- return conn
- }
- // Returns a conneciton to the AMQP if the AMQP_URL environment
- // variable is set and a connnection can be established.
- func integrationConnection(t *testing.T, name string) *Connection {
- conn, err := Dial(integrationURLFromEnv())
- if err != nil {
- t.Errorf("dial integration server: %s", err)
- return nil
- }
- return loggedConnection(t, conn, name)
- }
- // Returns a connection, channel and delcares a queue when the AMQP_URL is in the environment
- func integrationQueue(t *testing.T, name string) (*Connection, *Channel) {
- if conn := integrationConnection(t, name); conn != nil {
- if channel, err := conn.Channel(); err == nil {
- if _, err = channel.QueueDeclare(name, false, true, false, false, nil); err == nil {
- return conn, channel
- }
- }
- }
- return nil, nil
- }
- // Delegates to integrationConnection and only returns a connection if the
- // product is RabbitMQ
- func integrationRabbitMQ(t *testing.T, name string) *Connection {
- if conn := integrationConnection(t, "connect"); conn != nil {
- if server, ok := conn.Properties["product"]; ok && server == "RabbitMQ" {
- return conn
- }
- }
- return nil
- }
- func assertConsumeBody(t *testing.T, messages <-chan Delivery, want []byte) (msg *Delivery) {
- select {
- case got := <-messages:
- if bytes.Compare(want, got.Body) != 0 {
- t.Fatalf("Message body does not match want: %v, got: %v, for: %+v", want, got.Body, got)
- }
- msg = &got
- case <-time.After(200 * time.Millisecond):
- t.Fatalf("Timeout waiting for %v", want)
- }
- return msg
- }
- // Pulls out the CRC and verifies the remaining content against the CRC
- func assertMessageCrc32(t *testing.T, msg []byte, assert string) {
- size := binary.BigEndian.Uint32(msg[:4])
- crc := crc32.NewIEEE()
- crc.Write(msg[8:])
- if binary.BigEndian.Uint32(msg[4:8]) != crc.Sum32() {
- t.Fatalf("Message does not match CRC: %s", assert)
- }
- if int(size) != len(msg)-8 {
- t.Fatalf("Message does not match size, should=%d, is=%d: %s", size, len(msg)-8, assert)
- }
- }
- // Creates a random body size with a leading 32-bit CRC in network byte order
- // that verifies the remaining slice
- func generateCrc32Random(size int) []byte {
- msg := make([]byte, size+8)
- if _, err := io.ReadFull(devrand.Reader, msg); err != nil {
- panic(err)
- }
- crc := crc32.NewIEEE()
- crc.Write(msg[8:])
- binary.BigEndian.PutUint32(msg[0:4], uint32(size))
- binary.BigEndian.PutUint32(msg[4:8], crc.Sum32())
- return msg
- }
|