reconnect_test.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package amqp_test
  2. import (
  3. "fmt"
  4. "github.com/streadway/amqp"
  5. "os"
  6. )
  7. // Every connection should declare the topology they expect
  8. func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
  9. conn, err := amqp.Dial(url)
  10. if err != nil {
  11. return nil, nil, err
  12. }
  13. ch, err := conn.Channel()
  14. if err != nil {
  15. return nil, nil, err
  16. }
  17. if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
  18. return nil, nil, err
  19. }
  20. return conn, ch, nil
  21. }
  22. func consume(url, queue string) (*amqp.Connection, <-chan amqp.Delivery, error) {
  23. conn, ch, err := setup(url, queue)
  24. if err != nil {
  25. return nil, nil, err
  26. }
  27. // Indicate we only want 1 message to acknowledge at a time.
  28. if err := ch.Qos(1, 0, false); err != nil {
  29. return nil, nil, err
  30. }
  31. // Exclusive consumer
  32. deliveries, err := ch.Consume(queue, "", false, true, false, false, nil)
  33. return conn, deliveries, err
  34. }
  35. func ExampleConnection_reconnect() {
  36. if url := os.Getenv("AMQP_URL"); url != "" {
  37. queue := "example.reconnect"
  38. // The connection/channel for publishing to interleave the ingress messages
  39. // between reconnects, shares the same topology as the consumer. If we rather
  40. // sent all messages up front, the first consumer would receive every message.
  41. // We would rather show how the messages are not lost between reconnects.
  42. _, pub, err := setup(url, queue)
  43. if err != nil {
  44. fmt.Println("err publisher setup:", err)
  45. return
  46. }
  47. // Purge the queue from the publisher side to establish initial state
  48. if _, err := pub.QueuePurge(queue, false); err != nil {
  49. fmt.Println("err purge:", err)
  50. return
  51. }
  52. // Reconnect simulation, should be for { ... } in production
  53. for i := 1; i <= 3; i++ {
  54. fmt.Println("connect")
  55. conn, deliveries, err := consume(url, queue)
  56. if err != nil {
  57. fmt.Println("err consume:", err)
  58. return
  59. }
  60. // Simulate a producer on a different connection showing that consumers
  61. // continue where they were left off after each reconnect.
  62. if err := pub.Publish("", queue, false, false, amqp.Publishing{
  63. Body: []byte(fmt.Sprintf("%d", i)),
  64. }); err != nil {
  65. fmt.Println("err publish:", err)
  66. return
  67. }
  68. // Simulates a consumer that when the range finishes, will setup a new
  69. // session and begin ranging over the deliveries again.
  70. for msg := range deliveries {
  71. fmt.Println(string(msg.Body))
  72. msg.Ack(false)
  73. // Simulate an error like a server restart, loss of route or operator
  74. // intervention that results in the connection terminating
  75. go conn.Close()
  76. }
  77. }
  78. } else {
  79. // pass with expected output when not running in an integration
  80. // environment.
  81. fmt.Println("connect")
  82. fmt.Println("1")
  83. fmt.Println("connect")
  84. fmt.Println("2")
  85. fmt.Println("connect")
  86. fmt.Println("3")
  87. }
  88. // Output:
  89. // connect
  90. // 1
  91. // connect
  92. // 2
  93. // connect
  94. // 3
  95. }