examples_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package amqp_test
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "github.com/streadway/amqp"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "runtime"
  10. "time"
  11. )
  12. func ExampleConfig_timeout() {
  13. // Provide your own anonymous Dial function that delgates to net.DialTimout
  14. // for custom timeouts
  15. conn, err := amqp.DialConfig("amqp:///", amqp.Config{
  16. Dial: func(network, addr string) (net.Conn, error) {
  17. return net.DialTimeout(network, addr, 2*time.Second)
  18. },
  19. })
  20. log.Printf("conn: %v, err: %v", conn, err)
  21. }
  22. func ExampleDialTLS() {
  23. // To get started with SSL/TLS follow the instructions for adding SSL/TLS
  24. // support in RabbitMQ with a private certificate authority here:
  25. //
  26. // http://www.rabbitmq.com/ssl.html
  27. //
  28. // Then in your rabbitmq.config, disable the plain AMQP port, verify clients
  29. // and fail if no certificate is presented with the following:
  30. //
  31. // [
  32. // {rabbit, [
  33. // {tcp_listeners, []}, % listens on 127.0.0.1:5672
  34. // {ssl_listeners, [5671]}, % listens on 0.0.0.0:5671
  35. // {ssl_options, [{cacertfile,"/path/to/your/testca/cacert.pem"},
  36. // {certfile,"/path/to/your/server/cert.pem"},
  37. // {keyfile,"/path/to/your/server/key.pem"},
  38. // {verify,verify_peer},
  39. // {fail_if_no_peer_cert,true}]}
  40. // ]}
  41. // ].
  42. cfg := new(tls.Config)
  43. // The self-signing certificate authority's certificate must be included in
  44. // the RootCAs to be trusted so that the server certificate can be verified.
  45. //
  46. // Alternatively to adding it to the tls.Config you can add the CA's cert to
  47. // your system's root CAs. The tls package will use the system roots
  48. // specific to each support OS. Under OS X, add (drag/drop) your cacert.pem
  49. // file to the 'Certificates' section of KeyChain.app to add and always
  50. // trust.
  51. //
  52. // Or with the command line add and trust the DER encoded certificate:
  53. //
  54. // security add-certificate testca/cacert.cer
  55. // security add-trusted-cert testca/cacert.cer
  56. //
  57. // If you depend on the system root CAs, then use nil for the RootCAs field
  58. // so the system roots will be loaded.
  59. cfg.RootCAs = x509.NewCertPool()
  60. if ca, err := ioutil.ReadFile("testca/cacert.pem"); err == nil {
  61. cfg.RootCAs.AppendCertsFromPEM(ca)
  62. }
  63. // Move the client cert and key to a location specific to your application
  64. // and load them here.
  65. if cert, err := tls.LoadX509KeyPair("client/cert.pem", "client/key.pem"); err == nil {
  66. cfg.Certificates = append(cfg.Certificates, cert)
  67. }
  68. // Server names are validated by the crypto/tls package, so the server
  69. // certificate must be made for the hostname in the URL. Find the commonName
  70. // (CN) and make sure the hostname in the URL matches this common name. Per
  71. // the RabbitMQ instructions for a self-signed cert, this defautls to the
  72. // current hostname.
  73. //
  74. // openssl x509 -noout -in server/cert.pem -subject
  75. //
  76. // If your server name in your certificate is different than the host you are
  77. // connecting to, set the hostname used for verification in
  78. // ServerName field of the tls.Config struct.
  79. conn, err := amqp.DialTLS("amqps://server-name-from-certificate/", cfg)
  80. log.Printf("conn: %v, err: %v", conn, err)
  81. }
  82. func ExampleChannel_Confirm_bridge() {
  83. // This example acts as a bridge, shoveling all messages sent from the source
  84. // exchange "log" to destination exchange "log".
  85. // Confirming publishes can help from overproduction and ensure every message
  86. // is delivered.
  87. // Setup the source of the store and forward
  88. source, err := amqp.Dial("amqp://source/")
  89. if err != nil {
  90. log.Fatalf("connection.open source: %s", err)
  91. }
  92. defer source.Close()
  93. chs, err := source.Channel()
  94. if err != nil {
  95. log.Fatalf("channel.open source: %s", err)
  96. }
  97. if err := chs.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
  98. log.Fatalf("exchange.declare destination: %s", err)
  99. }
  100. if _, err := chs.QueueDeclare("remote-tee", true, true, false, false, nil); err != nil {
  101. log.Fatalf("queue.declare source: %s", err)
  102. }
  103. if err := chs.QueueBind("remote-tee", "#", "logs", false, nil); err != nil {
  104. log.Fatalf("queue.bind source: %s", err)
  105. }
  106. shovel, err := chs.Consume("remote-tee", "shovel", false, false, false, false, nil)
  107. if err != nil {
  108. log.Fatalf("basic.consume source: %s", err)
  109. }
  110. // Setup the destination of the store and forward
  111. destination, err := amqp.Dial("amqp://destination/")
  112. if err != nil {
  113. log.Fatalf("connection.open destination: %s", err)
  114. }
  115. defer destination.Close()
  116. chd, err := destination.Channel()
  117. if err != nil {
  118. log.Fatalf("channel.open destination: %s", err)
  119. }
  120. if err := chd.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
  121. log.Fatalf("exchange.declare destination: %s", err)
  122. }
  123. // Buffer of 1 for our single outstanding publishing
  124. pubAcks, pubNacks := chd.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
  125. if err := chd.Confirm(false); err != nil {
  126. log.Fatalf("confirm.select destination: %s", err)
  127. }
  128. // Now pump the messages, one by one, a smarter implementation
  129. // would batch the deliveries and use multiple ack/nacks
  130. for {
  131. msg, ok := <-shovel
  132. if !ok {
  133. log.Fatalf("source channel closed, see the reconnect example for handling this")
  134. }
  135. err = chd.Publish("logs", msg.RoutingKey, false, false, amqp.Publishing{
  136. // Copy all the properties
  137. ContentType: msg.ContentType,
  138. ContentEncoding: msg.ContentEncoding,
  139. DeliveryMode: msg.DeliveryMode,
  140. Priority: msg.Priority,
  141. CorrelationId: msg.CorrelationId,
  142. ReplyTo: msg.ReplyTo,
  143. Expiration: msg.Expiration,
  144. MessageId: msg.MessageId,
  145. Timestamp: msg.Timestamp,
  146. Type: msg.Type,
  147. UserId: msg.UserId,
  148. AppId: msg.AppId,
  149. // Custom headers
  150. Headers: msg.Headers,
  151. // And the body
  152. Body: msg.Body,
  153. })
  154. if err != nil {
  155. msg.Nack(false, false)
  156. log.Fatalf("basic.publish destination: %s", msg)
  157. }
  158. // only ack the source delivery when the destination acks the publishing
  159. // here you could check for delivery order by keeping a local state of
  160. // expected delivery tags
  161. select {
  162. case <-pubAcks:
  163. msg.Ack(false)
  164. case <-pubNacks:
  165. msg.Nack(false, false)
  166. }
  167. }
  168. }
  169. func ExampleChannel_Consume() {
  170. // Connects opens an AMQP connection from the credentials in the URL.
  171. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  172. if err != nil {
  173. log.Fatalf("connection.open: %s", err)
  174. }
  175. defer conn.Close()
  176. c, err := conn.Channel()
  177. if err != nil {
  178. log.Fatalf("channel.open: %s", err)
  179. }
  180. // We declare our topology on both the publisher and consumer to ensure they
  181. // are the same. This is part of AMQP being a programmable messaging model.
  182. //
  183. // See the Channel.Publish example for the complimentary declare.
  184. err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
  185. if err != nil {
  186. log.Fatalf("exchange.declare: %s", err)
  187. }
  188. // Establish our queue topologies that we are responsible for
  189. type bind struct {
  190. queue string
  191. key string
  192. }
  193. bindings := []bind{
  194. bind{"page", "alert"},
  195. bind{"email", "info"},
  196. bind{"firehose", "#"},
  197. }
  198. for _, b := range bindings {
  199. _, err = c.QueueDeclare(b.queue, true, false, false, false, nil)
  200. if err != nil {
  201. log.Fatalf("queue.declare: %v", err)
  202. }
  203. err = c.QueueBind(b.queue, b.key, "logs", false, nil)
  204. if err != nil {
  205. log.Fatalf("queue.bind: %v", err)
  206. }
  207. }
  208. // Set our quality of service. Since we're sharing 3 consumers on the same
  209. // channel, we want at least 3 messages in flight.
  210. err = c.Qos(3, 0, false)
  211. if err != nil {
  212. log.Fatalf("basic.qos: %v", err)
  213. }
  214. // Establish our consumers that have different responsibilities. Our first
  215. // two queues do not ack the messages on the server, so require to be acked
  216. // on the client.
  217. pages, err := c.Consume("page", "pager", false, false, false, false, nil)
  218. if err != nil {
  219. log.Fatalf("basic.consume: %v", err)
  220. }
  221. go func() {
  222. for log := range pages {
  223. // ... this consumer is responsible for sending pages per log
  224. log.Ack(false)
  225. }
  226. }()
  227. // Notice how the concern for which messages arrive here are in the AMQP
  228. // topology and not in the queue. We let the server pick a consumer tag this
  229. // time.
  230. emails, err := c.Consume("email", "", false, false, false, false, nil)
  231. if err != nil {
  232. log.Fatalf("basic.consume: %v", err)
  233. }
  234. go func() {
  235. for log := range emails {
  236. // ... this consumer is responsible for sending emails per log
  237. log.Ack(false)
  238. }
  239. }()
  240. // This consumer requests that every message is acknowledged as soon as it's
  241. // delivered.
  242. firehose, err := c.Consume("firehose", "", true, false, false, false, nil)
  243. if err != nil {
  244. log.Fatalf("basic.consume: %v", err)
  245. }
  246. // To show how to process the items in parallel, we'll use a work pool.
  247. for i := 0; i < runtime.NumCPU(); i++ {
  248. go func(work <-chan amqp.Delivery) {
  249. for _ = range work {
  250. // ... this consumer pulls from the firehose and doesn't need to acknowledge
  251. }
  252. }(firehose)
  253. }
  254. // Wait until you're ready to finish, could be a signal handler here.
  255. time.Sleep(10 * time.Second)
  256. // Cancelling a consumer by name will finish the range and gracefully end the
  257. // goroutine
  258. err = c.Cancel("pager", false)
  259. if err != nil {
  260. log.Fatalf("basic.cancel: %v", err)
  261. }
  262. // deferred closing the Connection will also finish the consumer's ranges of
  263. // their delivery chans. If you need every delivery to be processed, make
  264. // sure to wait for all consumers goroutines to finish before exiting your
  265. // process.
  266. }
  267. func ExampleChannel_Publish() {
  268. // Connects opens an AMQP connection from the credentials in the URL.
  269. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  270. if err != nil {
  271. log.Fatalf("connection.open: %s", err)
  272. }
  273. // This waits for a server acknowledgment which means the sockets will have
  274. // flushed all outbound publishings prior to returning. It's important to
  275. // block on Close to not lose any publishings.
  276. defer conn.Close()
  277. c, err := conn.Channel()
  278. if err != nil {
  279. log.Fatalf("channel.open: %s", err)
  280. }
  281. // We declare our topology on both the publisher and consumer to ensure they
  282. // are the same. This is part of AMQP being a programmable messaging model.
  283. //
  284. // See the Channel.Consume example for the complimentary declare.
  285. err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
  286. if err != nil {
  287. log.Fatalf("exchange.declare: %v", err)
  288. }
  289. // Prepare this message to be persistent. Your publishing requirements may
  290. // be different.
  291. msg := amqp.Publishing{
  292. DeliveryMode: amqp.Persistent,
  293. Timestamp: time.Now(),
  294. ContentType: "text/plain",
  295. Body: []byte("Go Go AMQP!"),
  296. }
  297. // This is not a mandatory delivery, so it will be dropped if there are no
  298. // queues bound to the logs exchange.
  299. err = c.Publish("logs", "info", false, false, msg)
  300. if err != nil {
  301. // Since publish is asynchronous this can happen if the network connection
  302. // is reset or if the server has run out of resources.
  303. log.Fatalf("basic.publish: %v", err)
  304. }
  305. }
  306. func publishAllTheThings(conn *amqp.Connection) {
  307. // ... snarf snarf, barf barf
  308. }
  309. func ExampleConnection_NotifyBlocked() {
  310. // Simply logs when the server throttles the TCP connection for publishers
  311. // Test this by tuning your server to have a low memory watermark:
  312. // rabbitmqctl set_vm_memory_high_watermark 0.00000001
  313. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  314. if err != nil {
  315. log.Fatalf("connection.open: %s", err)
  316. }
  317. defer conn.Close()
  318. blockings := conn.NotifyBlocked(make(chan amqp.Blocking))
  319. go func() {
  320. for b := range blockings {
  321. if b.Active {
  322. log.Printf("TCP blocked: %q", b.Reason)
  323. } else {
  324. log.Printf("TCP unblocked")
  325. }
  326. }
  327. }()
  328. // Your application domain channel setup publishings
  329. publishAllTheThings(conn)
  330. }