notify.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. package pq
  2. // Package pq is a pure Go Postgres driver for the database/sql package.
  3. // This module contains support for Postgres LISTEN/NOTIFY.
  4. import (
  5. "errors"
  6. "fmt"
  7. "io"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // Notification represents a single notification from the database.
  13. type Notification struct {
  14. // Process ID (PID) of the notifying postgres backend.
  15. BePid int
  16. // Name of the channel the notification was sent on.
  17. Channel string
  18. // Payload, or the empty string if unspecified.
  19. Extra string
  20. }
  21. func recvNotification(r *readBuf) *Notification {
  22. bePid := r.int32()
  23. channel := r.string()
  24. extra := r.string()
  25. return &Notification{bePid, channel, extra}
  26. }
  27. const (
  28. connStateIdle int32 = iota
  29. connStateExpectResponse
  30. connStateExpectReadyForQuery
  31. )
  32. type message struct {
  33. typ byte
  34. err error
  35. }
  36. var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
  37. // ListenerConn is a low-level interface for waiting for notifications. You
  38. // should use Listener instead.
  39. type ListenerConn struct {
  40. // guards cn and err
  41. connectionLock sync.Mutex
  42. cn *conn
  43. err error
  44. connState int32
  45. // the sending goroutine will be holding this lock
  46. senderLock sync.Mutex
  47. notificationChan chan<- *Notification
  48. replyChan chan message
  49. }
  50. // Creates a new ListenerConn. Use NewListener instead.
  51. func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
  52. cn, err := Open(name)
  53. if err != nil {
  54. return nil, err
  55. }
  56. l := &ListenerConn{
  57. cn: cn.(*conn),
  58. notificationChan: notificationChan,
  59. connState: connStateIdle,
  60. replyChan: make(chan message, 2),
  61. }
  62. go l.listenerConnMain()
  63. return l, nil
  64. }
  65. // We can only allow one goroutine at a time to be running a query on the
  66. // connection for various reasons, so the goroutine sending on the connection
  67. // must be holding senderLock.
  68. //
  69. // Returns an error if an unrecoverable error has occurred and the ListenerConn
  70. // should be abandoned.
  71. func (l *ListenerConn) acquireSenderLock() error {
  72. l.connectionLock.Lock()
  73. defer l.connectionLock.Unlock()
  74. if l.err != nil {
  75. return l.err
  76. }
  77. l.senderLock.Lock()
  78. return nil
  79. }
  80. func (l *ListenerConn) releaseSenderLock() {
  81. l.senderLock.Unlock()
  82. }
  83. // setState advances the protocol state to newState. Returns false if moving
  84. // to that state from the current state is not allowed.
  85. func (l *ListenerConn) setState(newState int32) bool {
  86. var expectedState int32
  87. switch newState {
  88. case connStateIdle:
  89. expectedState = connStateExpectReadyForQuery
  90. case connStateExpectResponse:
  91. expectedState = connStateIdle
  92. case connStateExpectReadyForQuery:
  93. expectedState = connStateExpectResponse
  94. default:
  95. panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
  96. }
  97. return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
  98. }
  99. // Main logic is here: receive messages from the postgres backend, forward
  100. // notifications and query replies and keep the internal state in sync with the
  101. // protocol state. Returns when the connection has been lost, is about to go
  102. // away or should be discarded because we couldn't agree on the state with the
  103. // server backend.
  104. func (l *ListenerConn) listenerConnLoop() (err error) {
  105. defer l.cn.errRecover(&err)
  106. r := &readBuf{}
  107. for {
  108. t, err := l.cn.recvMessage(r)
  109. if err != nil {
  110. return err
  111. }
  112. switch t {
  113. case 'A':
  114. // recvNotification copies all the data so we don't need to worry
  115. // about the scratch buffer being overwritten.
  116. l.notificationChan <- recvNotification(r)
  117. case 'E':
  118. // We might receive an ErrorResponse even when not in a query; it
  119. // is expected that the server will close the connection after
  120. // that, but we should make sure that the error we display is the
  121. // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
  122. if !l.setState(connStateExpectReadyForQuery) {
  123. return parseError(r)
  124. }
  125. l.replyChan <- message{t, parseError(r)}
  126. case 'C', 'I':
  127. if !l.setState(connStateExpectReadyForQuery) {
  128. // protocol out of sync
  129. return fmt.Errorf("unexpected CommandComplete")
  130. }
  131. // ExecSimpleQuery doesn't need to know about this message
  132. case 'Z':
  133. if !l.setState(connStateIdle) {
  134. // protocol out of sync
  135. return fmt.Errorf("unexpected ReadyForQuery")
  136. }
  137. l.replyChan <- message{t, nil}
  138. case 'N', 'S':
  139. // ignore
  140. default:
  141. return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
  142. }
  143. }
  144. }
  145. // This is the main routine for the goroutine receiving on the database
  146. // connection. Most of the main logic is in listenerConnLoop.
  147. func (l *ListenerConn) listenerConnMain() {
  148. err := l.listenerConnLoop()
  149. // listenerConnLoop terminated; we're done, but we still have to clean up.
  150. // Make sure nobody tries to start any new queries by making sure the err
  151. // pointer is set. It is important that we do not overwrite its value; a
  152. // connection could be closed by either this goroutine or one sending on
  153. // the connection -- whoever closes the connection is assumed to have the
  154. // more meaningful error message (as the other one will probably get
  155. // net.errClosed), so that goroutine sets the error we expose while the
  156. // other error is discarded. If the connection is lost while two
  157. // goroutines are operating on the socket, it probably doesn't matter which
  158. // error we expose so we don't try to do anything more complex.
  159. l.connectionLock.Lock()
  160. if l.err == nil {
  161. l.err = err
  162. }
  163. l.cn.Close()
  164. l.connectionLock.Unlock()
  165. // There might be a query in-flight; make sure nobody's waiting for a
  166. // response to it, since there's not going to be one.
  167. close(l.replyChan)
  168. // let the listener know we're done
  169. close(l.notificationChan)
  170. // this ListenerConn is done
  171. }
  172. // Send a LISTEN query to the server. See ExecSimpleQuery.
  173. func (l *ListenerConn) Listen(channel string) (bool, error) {
  174. return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
  175. }
  176. // Send an UNLISTEN query to the server. See ExecSimpleQuery.
  177. func (l *ListenerConn) Unlisten(channel string) (bool, error) {
  178. return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
  179. }
  180. // Send `UNLISTEN *` to the server. See ExecSimpleQuery.
  181. func (l *ListenerConn) UnlistenAll() (bool, error) {
  182. return l.ExecSimpleQuery("UNLISTEN *")
  183. }
  184. // Ping the remote server to make sure it's alive. Non-nil error means the
  185. // connection has failed and should be abandoned.
  186. func (l *ListenerConn) Ping() error {
  187. sent, err := l.ExecSimpleQuery("")
  188. if !sent {
  189. return err
  190. }
  191. if err != nil {
  192. // shouldn't happen
  193. panic(err)
  194. }
  195. return nil
  196. }
  197. // Attempt to send a query on the connection. Returns an error if sending the
  198. // query failed, and the caller should initiate closure of this connection.
  199. // The caller must be holding senderLock (see acquireSenderLock and
  200. // releaseSenderLock).
  201. func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
  202. defer l.cn.errRecover(&err)
  203. // must set connection state before sending the query
  204. if !l.setState(connStateExpectResponse) {
  205. panic("two queries running at the same time")
  206. }
  207. // Can't use l.cn.writeBuf here because it uses the scratch buffer which
  208. // might get overwritten by listenerConnLoop.
  209. data := writeBuf([]byte("Q\x00\x00\x00\x00"))
  210. b := &data
  211. b.string(q)
  212. l.cn.send(b)
  213. return nil
  214. }
  215. // Execute a "simple query" (i.e. one with no bindable parameters) on the
  216. // connection. The possible return values are:
  217. // 1) "executed" is true; the query was executed to completion on the
  218. // database server. If the query failed, err will be set to the error
  219. // returned by the database, otherwise err will be nil.
  220. // 2) If "executed" is false, the query could not be executed on the remote
  221. // server. err will be non-nil.
  222. //
  223. // After a call to ExecSimpleQuery has returned an executed=false value, the
  224. // connection has either been closed or will be closed shortly thereafter, and
  225. // all subsequently executed queries will return an error.
  226. func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
  227. if err = l.acquireSenderLock(); err != nil {
  228. return false, err
  229. }
  230. defer l.releaseSenderLock()
  231. err = l.sendSimpleQuery(q)
  232. if err != nil {
  233. // We can't know what state the protocol is in, so we need to abandon
  234. // this connection.
  235. l.connectionLock.Lock()
  236. defer l.connectionLock.Unlock()
  237. // Set the error pointer if it hasn't been set already; see
  238. // listenerConnMain.
  239. if l.err == nil {
  240. l.err = err
  241. }
  242. l.cn.Close()
  243. return false, err
  244. }
  245. // now we just wait for a reply..
  246. for {
  247. m, ok := <-l.replyChan
  248. if !ok {
  249. // We lost the connection to server, don't bother waiting for a
  250. // a response.
  251. return false, io.EOF
  252. }
  253. switch m.typ {
  254. case 'Z':
  255. // sanity check
  256. if m.err != nil {
  257. panic("m.err != nil")
  258. }
  259. // done; err might or might not be set
  260. return true, err
  261. case 'E':
  262. // sanity check
  263. if m.err == nil {
  264. panic("m.err == nil")
  265. }
  266. // server responded with an error; ReadyForQuery to follow
  267. err = m.err
  268. default:
  269. return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
  270. }
  271. }
  272. }
  273. func (l *ListenerConn) Close() error {
  274. l.connectionLock.Lock()
  275. defer l.connectionLock.Unlock()
  276. if l.err != nil {
  277. return errListenerConnClosed
  278. }
  279. l.err = errListenerConnClosed
  280. return l.cn.Close()
  281. }
  282. // Err() returns the reason the connection was closed. It is not safe to call
  283. // this function until l.Notify has been closed.
  284. func (l *ListenerConn) Err() error {
  285. return l.err
  286. }
  287. var errListenerClosed = errors.New("pq: Listener has been closed")
  288. var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
  289. var ErrChannelNotOpen = errors.New("pq: channel is not open")
  290. type ListenerEventType int
  291. const (
  292. // Emitted only when the database connection has been initially
  293. // initialized. err will always be nil.
  294. ListenerEventConnected ListenerEventType = iota
  295. // Emitted after a database connection has been lost, either because of an
  296. // error or because Close has been called. err will be set to the reason
  297. // the database connection was lost.
  298. ListenerEventDisconnected
  299. // Emitted after a database connection has been re-established after
  300. // connection loss. err will always be nil. After this event has been
  301. // emitted, a nil pq.Notification is sent on the Listener.Notify channel.
  302. ListenerEventReconnected
  303. // Emitted after a connection to the database was attempted, but failed.
  304. // err will be set to an error describing why the connection attempt did
  305. // not succeed.
  306. ListenerEventConnectionAttemptFailed
  307. )
  308. type EventCallbackType func(event ListenerEventType, err error)
  309. // Listener provides an interface for listening to notifications from a
  310. // PostgreSQL database. For general usage information, see section
  311. // "Notifications".
  312. //
  313. // Listener can safely be used from concurrently running goroutines.
  314. type Listener struct {
  315. // Channel for receiving notifications from the database. In some cases a
  316. // nil value will be sent. See section "Notifications" above.
  317. Notify chan *Notification
  318. name string
  319. minReconnectInterval time.Duration
  320. maxReconnectInterval time.Duration
  321. eventCallback EventCallbackType
  322. lock sync.Mutex
  323. isClosed bool
  324. reconnectCond *sync.Cond
  325. cn *ListenerConn
  326. connNotificationChan <-chan *Notification
  327. channels map[string]struct{}
  328. }
  329. // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
  330. //
  331. // name should be set to a connection string to be used to establish the
  332. // database connection (see section "Connection String Parameters" above).
  333. //
  334. // minReconnectInterval controls the duration to wait before trying to
  335. // re-establish the database connection after connection loss. After each
  336. // consecutive failure this interval is doubled, until maxReconnectInterval is
  337. // reached. Successfully completing the connection establishment procedure
  338. // resets the interval back to minReconnectInterval.
  339. //
  340. // The last parameter eventCallback can be set to a function which will be
  341. // called by the Listener when the state of the underlying database connection
  342. // changes. This callback will be called by the goroutine which dispatches the
  343. // notifications over the Notify channel, so you should try to avoid doing
  344. // potentially time-consuming operations from the callback.
  345. func NewListener(name string,
  346. minReconnectInterval time.Duration,
  347. maxReconnectInterval time.Duration,
  348. eventCallback EventCallbackType) *Listener {
  349. l := &Listener{
  350. name: name,
  351. minReconnectInterval: minReconnectInterval,
  352. maxReconnectInterval: maxReconnectInterval,
  353. eventCallback: eventCallback,
  354. channels: make(map[string]struct{}),
  355. Notify: make(chan *Notification, 32),
  356. }
  357. l.reconnectCond = sync.NewCond(&l.lock)
  358. go l.listenerMain()
  359. return l
  360. }
  361. // Returns the notification channel for this listener. This is the same
  362. // channel as Notify, and will not be recreated during the life time of the
  363. // Listener.
  364. func (l *Listener) NotificationChannel() <-chan *Notification {
  365. return l.Notify
  366. }
  367. // Listen starts listening for notifications on a channel. Calls to this
  368. // function will block until an acknowledgement has been received from the
  369. // server. Note that Listener automatically re-establishes the connection
  370. // after connection loss, so this function may block indefinitely if the
  371. // connection can not be re-established.
  372. //
  373. // Listen will only fail in three conditions:
  374. // 1) The channel is already open. The returned error will be
  375. // ErrChannelAlreadyOpen.
  376. // 2) The query was executed on the remote server, but PostgreSQL returned an
  377. // error message in response to the query. The returned error will be a
  378. // pq.Error containing the information the server supplied.
  379. // 3) Close is called on the Listener before the request could be completed.
  380. //
  381. // The channel name is case-sensitive.
  382. func (l *Listener) Listen(channel string) error {
  383. l.lock.Lock()
  384. defer l.lock.Unlock()
  385. if l.isClosed {
  386. return errListenerClosed
  387. }
  388. // The server allows you to issue a LISTEN on a channel which is already
  389. // open, but it seems useful to be able to detect this case to spot for
  390. // mistakes in application logic. If the application genuinely does't
  391. // care, it can check the exported error and ignore it.
  392. _, exists := l.channels[channel]
  393. if exists {
  394. return ErrChannelAlreadyOpen
  395. }
  396. if l.cn != nil {
  397. // If gotResponse is true but error is set, the query was executed on
  398. // the remote server, but resulted in an error. This should be
  399. // relatively rare, so it's fine if we just pass the error to our
  400. // caller. However, if gotResponse is false, we could not complete the
  401. // query on the remote server and our underlying connection is about
  402. // to go away, so we only add relname to l.channels, and wait for
  403. // resync() to take care of the rest.
  404. gotResponse, err := l.cn.Listen(channel)
  405. if gotResponse && err != nil {
  406. return err
  407. }
  408. }
  409. l.channels[channel] = struct{}{}
  410. for l.cn == nil {
  411. l.reconnectCond.Wait()
  412. // we let go of the mutex for a while
  413. if l.isClosed {
  414. return errListenerClosed
  415. }
  416. }
  417. return nil
  418. }
  419. // Unlisten removes a channel from the Listener's channel list. Returns
  420. // ErrChannelNotOpen if the Listener is not listening on the specified channel.
  421. // Returns immediately with no error if there is no connection. Note that you
  422. // might still get notifications for this channel even after Unlisten has
  423. // returned.
  424. //
  425. // The channel name is case-sensitive.
  426. func (l *Listener) Unlisten(channel string) error {
  427. l.lock.Lock()
  428. defer l.lock.Unlock()
  429. if l.isClosed {
  430. return errListenerClosed
  431. }
  432. // Similarly to LISTEN, this is not an error in Postgres, but it seems
  433. // useful to distinguish from the normal conditions.
  434. _, exists := l.channels[channel]
  435. if !exists {
  436. return ErrChannelNotOpen
  437. }
  438. if l.cn != nil {
  439. // Similarly to Listen (see comment in that function), the caller
  440. // should only be bothered with an error if it came from the backend as
  441. // a response to our query.
  442. gotResponse, err := l.cn.Unlisten(channel)
  443. if gotResponse && err != nil {
  444. return err
  445. }
  446. }
  447. // Don't bother waiting for resync if there's no connection.
  448. delete(l.channels, channel)
  449. return nil
  450. }
  451. // UnlistenAll removes all channels from the Listener's channel list. Returns
  452. // immediately with no error if there is no connection. Note that you might
  453. // still get notifications for any of the deleted channels even after
  454. // UnlistenAll has returned.
  455. func (l *Listener) UnlistenAll() error {
  456. l.lock.Lock()
  457. defer l.lock.Unlock()
  458. if l.isClosed {
  459. return errListenerClosed
  460. }
  461. if l.cn != nil {
  462. // Similarly to Listen (see comment in that function), the caller
  463. // should only be bothered with an error if it came from the backend as
  464. // a response to our query.
  465. gotResponse, err := l.cn.UnlistenAll()
  466. if gotResponse && err != nil {
  467. return err
  468. }
  469. }
  470. // Don't bother waiting for resync if there's no connection.
  471. l.channels = make(map[string]struct{})
  472. return nil
  473. }
  474. // Ping the remote server to make sure it's alive. Non-nil return value means
  475. // that there is no active connection.
  476. func (l *Listener) Ping() error {
  477. l.lock.Lock()
  478. defer l.lock.Unlock()
  479. if l.isClosed {
  480. return errListenerClosed
  481. }
  482. if l.cn == nil {
  483. return errors.New("no connection")
  484. }
  485. return l.cn.Ping()
  486. }
  487. // Clean up after losing the server connection. Returns l.cn.Err(), which
  488. // should have the reason the connection was lost.
  489. func (l *Listener) disconnectCleanup() error {
  490. l.lock.Lock()
  491. defer l.lock.Unlock()
  492. // sanity check; can't look at Err() until the channel has been closed
  493. select {
  494. case _, ok := <-l.connNotificationChan:
  495. if ok {
  496. panic("connNotificationChan not closed")
  497. }
  498. default:
  499. panic("connNotificationChan not closed")
  500. }
  501. err := l.cn.Err()
  502. l.cn.Close()
  503. l.cn = nil
  504. return err
  505. }
  506. // Synchronize the list of channels we want to be listening on with the server
  507. // after the connection has been established.
  508. func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
  509. doneChan := make(chan error)
  510. go func() {
  511. for channel := range l.channels {
  512. // If we got a response, return that error to our caller as it's
  513. // going to be more descriptive than cn.Err().
  514. gotResponse, err := cn.Listen(channel)
  515. if gotResponse && err != nil {
  516. doneChan <- err
  517. return
  518. }
  519. // If we couldn't reach the server, wait for notificationChan to
  520. // close and then return the error message from the connection, as
  521. // per ListenerConn's interface.
  522. if err != nil {
  523. for _ = range notificationChan {
  524. }
  525. doneChan <- cn.Err()
  526. return
  527. }
  528. }
  529. doneChan <- nil
  530. }()
  531. // Ignore notifications while synchronization is going on to avoid
  532. // deadlocks. We have to send a nil notification over Notify anyway as
  533. // we can't possibly know which notifications (if any) were lost while
  534. // the connection was down, so there's no reason to try and process
  535. // these messages at all.
  536. for {
  537. select {
  538. case _, ok := <-notificationChan:
  539. if !ok {
  540. notificationChan = nil
  541. }
  542. case err := <-doneChan:
  543. return err
  544. }
  545. }
  546. }
  547. // caller should NOT be holding l.lock
  548. func (l *Listener) closed() bool {
  549. l.lock.Lock()
  550. defer l.lock.Unlock()
  551. return l.isClosed
  552. }
  553. func (l *Listener) connect() error {
  554. notificationChan := make(chan *Notification, 32)
  555. cn, err := NewListenerConn(l.name, notificationChan)
  556. if err != nil {
  557. return err
  558. }
  559. l.lock.Lock()
  560. defer l.lock.Unlock()
  561. err = l.resync(cn, notificationChan)
  562. if err != nil {
  563. cn.Close()
  564. return err
  565. }
  566. l.cn = cn
  567. l.connNotificationChan = notificationChan
  568. l.reconnectCond.Broadcast()
  569. return nil
  570. }
  571. // Close disconnects the Listener from the database and shuts it down.
  572. // Subsequent calls to its methods will return an error. Close returns an
  573. // error if the connection has already been closed.
  574. func (l *Listener) Close() error {
  575. l.lock.Lock()
  576. defer l.lock.Unlock()
  577. if l.isClosed {
  578. return errListenerClosed
  579. }
  580. if l.cn != nil {
  581. l.cn.Close()
  582. }
  583. l.isClosed = true
  584. return nil
  585. }
  586. func (l *Listener) emitEvent(event ListenerEventType, err error) {
  587. if l.eventCallback != nil {
  588. l.eventCallback(event, err)
  589. }
  590. }
  591. // Main logic here: maintain a connection to the server when possible, wait
  592. // for notifications and emit events.
  593. func (l *Listener) listenerConnLoop() {
  594. var nextReconnect time.Time
  595. reconnectInterval := l.minReconnectInterval
  596. for {
  597. for {
  598. err := l.connect()
  599. if err == nil {
  600. break
  601. }
  602. if l.closed() {
  603. return
  604. }
  605. l.emitEvent(ListenerEventConnectionAttemptFailed, err)
  606. time.Sleep(reconnectInterval)
  607. reconnectInterval *= 2
  608. if reconnectInterval > l.maxReconnectInterval {
  609. reconnectInterval = l.maxReconnectInterval
  610. }
  611. }
  612. if nextReconnect.IsZero() {
  613. l.emitEvent(ListenerEventConnected, nil)
  614. } else {
  615. l.emitEvent(ListenerEventReconnected, nil)
  616. l.Notify <- nil
  617. }
  618. reconnectInterval = l.minReconnectInterval
  619. nextReconnect = time.Now().Add(reconnectInterval)
  620. for {
  621. notification, ok := <-l.connNotificationChan
  622. if !ok {
  623. // lost connection, loop again
  624. break
  625. }
  626. l.Notify <- notification
  627. }
  628. err := l.disconnectCleanup()
  629. if l.closed() {
  630. return
  631. }
  632. l.emitEvent(ListenerEventDisconnected, err)
  633. time.Sleep(nextReconnect.Sub(time.Now()))
  634. }
  635. }
  636. func (l *Listener) listenerMain() {
  637. l.listenerConnLoop()
  638. close(l.Notify)
  639. }