| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- package amqp_test
- import (
- "crypto/tls"
- "crypto/x509"
- "github.com/streadway/amqp"
- "io/ioutil"
- "log"
- "net"
- "runtime"
- "time"
- )
- func ExampleConfig_timeout() {
- // Provide your own anonymous Dial function that delgates to net.DialTimout
- // for custom timeouts
- conn, err := amqp.DialConfig("amqp:///", amqp.Config{
- Dial: func(network, addr string) (net.Conn, error) {
- return net.DialTimeout(network, addr, 2*time.Second)
- },
- })
- log.Printf("conn: %v, err: %v", conn, err)
- }
- func ExampleDialTLS() {
- // To get started with SSL/TLS follow the instructions for adding SSL/TLS
- // support in RabbitMQ with a private certificate authority here:
- //
- // http://www.rabbitmq.com/ssl.html
- //
- // Then in your rabbitmq.config, disable the plain AMQP port, verify clients
- // and fail if no certificate is presented with the following:
- //
- // [
- // {rabbit, [
- // {tcp_listeners, []}, % listens on 127.0.0.1:5672
- // {ssl_listeners, [5671]}, % listens on 0.0.0.0:5671
- // {ssl_options, [{cacertfile,"/path/to/your/testca/cacert.pem"},
- // {certfile,"/path/to/your/server/cert.pem"},
- // {keyfile,"/path/to/your/server/key.pem"},
- // {verify,verify_peer},
- // {fail_if_no_peer_cert,true}]}
- // ]}
- // ].
- cfg := new(tls.Config)
- // The self-signing certificate authority's certificate must be included in
- // the RootCAs to be trusted so that the server certificate can be verified.
- //
- // Alternatively to adding it to the tls.Config you can add the CA's cert to
- // your system's root CAs. The tls package will use the system roots
- // specific to each support OS. Under OS X, add (drag/drop) your cacert.pem
- // file to the 'Certificates' section of KeyChain.app to add and always
- // trust.
- //
- // Or with the command line add and trust the DER encoded certificate:
- //
- // security add-certificate testca/cacert.cer
- // security add-trusted-cert testca/cacert.cer
- //
- // If you depend on the system root CAs, then use nil for the RootCAs field
- // so the system roots will be loaded.
- cfg.RootCAs = x509.NewCertPool()
- if ca, err := ioutil.ReadFile("testca/cacert.pem"); err == nil {
- cfg.RootCAs.AppendCertsFromPEM(ca)
- }
- // Move the client cert and key to a location specific to your application
- // and load them here.
- if cert, err := tls.LoadX509KeyPair("client/cert.pem", "client/key.pem"); err == nil {
- cfg.Certificates = append(cfg.Certificates, cert)
- }
- // Server names are validated by the crypto/tls package, so the server
- // certificate must be made for the hostname in the URL. Find the commonName
- // (CN) and make sure the hostname in the URL matches this common name. Per
- // the RabbitMQ instructions for a self-signed cert, this defautls to the
- // current hostname.
- //
- // openssl x509 -noout -in server/cert.pem -subject
- //
- // If your server name in your certificate is different than the host you are
- // connecting to, set the hostname used for verification in
- // ServerName field of the tls.Config struct.
- conn, err := amqp.DialTLS("amqps://server-name-from-certificate/", cfg)
- log.Printf("conn: %v, err: %v", conn, err)
- }
- func ExampleChannel_Confirm_bridge() {
- // This example acts as a bridge, shoveling all messages sent from the source
- // exchange "log" to destination exchange "log".
- // Confirming publishes can help from overproduction and ensure every message
- // is delivered.
- // Setup the source of the store and forward
- source, err := amqp.Dial("amqp://source/")
- if err != nil {
- log.Fatalf("connection.open source: %s", err)
- }
- defer source.Close()
- chs, err := source.Channel()
- if err != nil {
- log.Fatalf("channel.open source: %s", err)
- }
- if err := chs.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
- log.Fatalf("exchange.declare destination: %s", err)
- }
- if _, err := chs.QueueDeclare("remote-tee", true, true, false, false, nil); err != nil {
- log.Fatalf("queue.declare source: %s", err)
- }
- if err := chs.QueueBind("remote-tee", "#", "logs", false, nil); err != nil {
- log.Fatalf("queue.bind source: %s", err)
- }
- shovel, err := chs.Consume("remote-tee", "shovel", false, false, false, false, nil)
- if err != nil {
- log.Fatalf("basic.consume source: %s", err)
- }
- // Setup the destination of the store and forward
- destination, err := amqp.Dial("amqp://destination/")
- if err != nil {
- log.Fatalf("connection.open destination: %s", err)
- }
- defer destination.Close()
- chd, err := destination.Channel()
- if err != nil {
- log.Fatalf("channel.open destination: %s", err)
- }
- if err := chd.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
- log.Fatalf("exchange.declare destination: %s", err)
- }
- // Buffer of 1 for our single outstanding publishing
- pubAcks, pubNacks := chd.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
- if err := chd.Confirm(false); err != nil {
- log.Fatalf("confirm.select destination: %s", err)
- }
- // Now pump the messages, one by one, a smarter implementation
- // would batch the deliveries and use multiple ack/nacks
- for {
- msg, ok := <-shovel
- if !ok {
- log.Fatalf("source channel closed, see the reconnect example for handling this")
- }
- err = chd.Publish("logs", msg.RoutingKey, false, false, amqp.Publishing{
- // Copy all the properties
- ContentType: msg.ContentType,
- ContentEncoding: msg.ContentEncoding,
- DeliveryMode: msg.DeliveryMode,
- Priority: msg.Priority,
- CorrelationId: msg.CorrelationId,
- ReplyTo: msg.ReplyTo,
- Expiration: msg.Expiration,
- MessageId: msg.MessageId,
- Timestamp: msg.Timestamp,
- Type: msg.Type,
- UserId: msg.UserId,
- AppId: msg.AppId,
- // Custom headers
- Headers: msg.Headers,
- // And the body
- Body: msg.Body,
- })
- if err != nil {
- msg.Nack(false, false)
- log.Fatalf("basic.publish destination: %s", msg)
- }
- // only ack the source delivery when the destination acks the publishing
- // here you could check for delivery order by keeping a local state of
- // expected delivery tags
- select {
- case <-pubAcks:
- msg.Ack(false)
- case <-pubNacks:
- msg.Nack(false, false)
- }
- }
- }
- func ExampleChannel_Consume() {
- // Connects opens an AMQP connection from the credentials in the URL.
- conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
- if err != nil {
- log.Fatalf("connection.open: %s", err)
- }
- defer conn.Close()
- c, err := conn.Channel()
- if err != nil {
- log.Fatalf("channel.open: %s", err)
- }
- // We declare our topology on both the publisher and consumer to ensure they
- // are the same. This is part of AMQP being a programmable messaging model.
- //
- // See the Channel.Publish example for the complimentary declare.
- err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
- if err != nil {
- log.Fatalf("exchange.declare: %s", err)
- }
- // Establish our queue topologies that we are responsible for
- type bind struct {
- queue string
- key string
- }
- bindings := []bind{
- bind{"page", "alert"},
- bind{"email", "info"},
- bind{"firehose", "#"},
- }
- for _, b := range bindings {
- _, err = c.QueueDeclare(b.queue, true, false, false, false, nil)
- if err != nil {
- log.Fatalf("queue.declare: %v", err)
- }
- err = c.QueueBind(b.queue, b.key, "logs", false, nil)
- if err != nil {
- log.Fatalf("queue.bind: %v", err)
- }
- }
- // Set our quality of service. Since we're sharing 3 consumers on the same
- // channel, we want at least 3 messages in flight.
- err = c.Qos(3, 0, false)
- if err != nil {
- log.Fatalf("basic.qos: %v", err)
- }
- // Establish our consumers that have different responsibilities. Our first
- // two queues do not ack the messages on the server, so require to be acked
- // on the client.
- pages, err := c.Consume("page", "pager", false, false, false, false, nil)
- if err != nil {
- log.Fatalf("basic.consume: %v", err)
- }
- go func() {
- for log := range pages {
- // ... this consumer is responsible for sending pages per log
- log.Ack(false)
- }
- }()
- // Notice how the concern for which messages arrive here are in the AMQP
- // topology and not in the queue. We let the server pick a consumer tag this
- // time.
- emails, err := c.Consume("email", "", false, false, false, false, nil)
- if err != nil {
- log.Fatalf("basic.consume: %v", err)
- }
- go func() {
- for log := range emails {
- // ... this consumer is responsible for sending emails per log
- log.Ack(false)
- }
- }()
- // This consumer requests that every message is acknowledged as soon as it's
- // delivered.
- firehose, err := c.Consume("firehose", "", true, false, false, false, nil)
- if err != nil {
- log.Fatalf("basic.consume: %v", err)
- }
- // To show how to process the items in parallel, we'll use a work pool.
- for i := 0; i < runtime.NumCPU(); i++ {
- go func(work <-chan amqp.Delivery) {
- for _ = range work {
- // ... this consumer pulls from the firehose and doesn't need to acknowledge
- }
- }(firehose)
- }
- // Wait until you're ready to finish, could be a signal handler here.
- time.Sleep(10 * time.Second)
- // Cancelling a consumer by name will finish the range and gracefully end the
- // goroutine
- err = c.Cancel("pager", false)
- if err != nil {
- log.Fatalf("basic.cancel: %v", err)
- }
- // deferred closing the Connection will also finish the consumer's ranges of
- // their delivery chans. If you need every delivery to be processed, make
- // sure to wait for all consumers goroutines to finish before exiting your
- // process.
- }
- func ExampleChannel_Publish() {
- // Connects opens an AMQP connection from the credentials in the URL.
- conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
- if err != nil {
- log.Fatalf("connection.open: %s", err)
- }
- // This waits for a server acknowledgment which means the sockets will have
- // flushed all outbound publishings prior to returning. It's important to
- // block on Close to not lose any publishings.
- defer conn.Close()
- c, err := conn.Channel()
- if err != nil {
- log.Fatalf("channel.open: %s", err)
- }
- // We declare our topology on both the publisher and consumer to ensure they
- // are the same. This is part of AMQP being a programmable messaging model.
- //
- // See the Channel.Consume example for the complimentary declare.
- err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
- if err != nil {
- log.Fatalf("exchange.declare: %v", err)
- }
- // Prepare this message to be persistent. Your publishing requirements may
- // be different.
- msg := amqp.Publishing{
- DeliveryMode: amqp.Persistent,
- Timestamp: time.Now(),
- ContentType: "text/plain",
- Body: []byte("Go Go AMQP!"),
- }
- // This is not a mandatory delivery, so it will be dropped if there are no
- // queues bound to the logs exchange.
- err = c.Publish("logs", "info", false, false, msg)
- if err != nil {
- // Since publish is asynchronous this can happen if the network connection
- // is reset or if the server has run out of resources.
- log.Fatalf("basic.publish: %v", err)
- }
- }
- func publishAllTheThings(conn *amqp.Connection) {
- // ... snarf snarf, barf barf
- }
- func ExampleConnection_NotifyBlocked() {
- // Simply logs when the server throttles the TCP connection for publishers
- // Test this by tuning your server to have a low memory watermark:
- // rabbitmqctl set_vm_memory_high_watermark 0.00000001
- conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
- if err != nil {
- log.Fatalf("connection.open: %s", err)
- }
- defer conn.Close()
- blockings := conn.NotifyBlocked(make(chan amqp.Blocking))
- go func() {
- for b := range blockings {
- if b.Active {
- log.Printf("TCP blocked: %q", b.Reason)
- } else {
- log.Printf("TCP unblocked")
- }
- }
- }()
- // Your application domain channel setup publishings
- publishAllTheThings(conn)
- }
|