eventpublisher.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package eventpublisher
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/grafana/grafana/pkg/bus"
  8. "github.com/grafana/grafana/pkg/events"
  9. "github.com/grafana/grafana/pkg/setting"
  10. "github.com/streadway/amqp"
  11. )
  12. var (
  13. url string
  14. exchange string
  15. conn *amqp.Connection
  16. channel *amqp.Channel
  17. )
  18. func getConnection() (*amqp.Connection, error) {
  19. c, err := amqp.Dial(url)
  20. if err != nil {
  21. return nil, err
  22. }
  23. return c, err
  24. }
  25. func getChannel() (*amqp.Channel, error) {
  26. ch, err := conn.Channel()
  27. if err != nil {
  28. return nil, err
  29. }
  30. err = ch.ExchangeDeclare(
  31. exchange, // name
  32. "topic", // type
  33. true, // durable
  34. false, // auto-deleted
  35. false, // internal
  36. false, // no-wait
  37. nil, // arguments
  38. )
  39. if err != nil {
  40. return nil, err
  41. }
  42. return ch, err
  43. }
  44. func Init() {
  45. sec := setting.Cfg.Section("event_publisher")
  46. if !sec.Key("enabled").MustBool(false) {
  47. return
  48. }
  49. url = sec.Key("rabbitmq_url").String()
  50. exchange = sec.Key("exchange").String()
  51. bus.AddWildcardListener(eventListener)
  52. if err := Setup(); err != nil {
  53. log.Fatal(4, "Failed to connect to notification queue: %v", err)
  54. return
  55. }
  56. }
  57. // Every connection should declare the topology they expect
  58. func Setup() error {
  59. c, err := getConnection()
  60. if err != nil {
  61. return err
  62. }
  63. conn = c
  64. ch, err := getChannel()
  65. if err != nil {
  66. return err
  67. }
  68. channel = ch
  69. // listen for close events so we can reconnect.
  70. errChan := channel.NotifyClose(make(chan *amqp.Error))
  71. go func() {
  72. for e := range errChan {
  73. fmt.Println("connection to rabbitmq lost.")
  74. fmt.Println(e)
  75. fmt.Println("attempting to create new rabbitmq channel.")
  76. ch, err := getChannel()
  77. if err == nil {
  78. channel = ch
  79. break
  80. }
  81. //could not create channel, so lets close the connection
  82. // and re-create.
  83. _ = conn.Close()
  84. for err != nil {
  85. time.Sleep(2 * time.Second)
  86. fmt.Println("attempting to reconnect to rabbitmq.")
  87. err = Setup()
  88. }
  89. fmt.Println("Connected to rabbitmq again.")
  90. }
  91. }()
  92. return nil
  93. }
  94. func publish(routingKey string, msgString []byte) {
  95. for {
  96. err := channel.Publish(
  97. exchange, //exchange
  98. routingKey, // routing key
  99. false, // mandatory
  100. false, // immediate
  101. amqp.Publishing{
  102. ContentType: "application/json",
  103. Body: msgString,
  104. },
  105. )
  106. if err == nil {
  107. return
  108. }
  109. // failures are most likely because the connection was lost.
  110. // the connection will be re-established, so just keep
  111. // retrying every 2seconds until we successfully publish.
  112. time.Sleep(2 * time.Second)
  113. fmt.Println("publish failed, retrying.")
  114. }
  115. }
  116. func eventListener(event interface{}) error {
  117. wireEvent, err := events.ToOnWriteEvent(event)
  118. if err != nil {
  119. return err
  120. }
  121. msgString, err := json.Marshal(wireEvent)
  122. if err != nil {
  123. return err
  124. }
  125. routingKey := fmt.Sprintf("%s.%s", wireEvent.Priority, wireEvent.EventType)
  126. // this is run in a greenthread and we expect that publish will keep
  127. // retrying until the message gets sent.
  128. go publish(routingKey, msgString)
  129. return nil
  130. }