notify_test.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. package pq
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "testing"
  8. "time"
  9. )
  10. var errNilNotification = errors.New("nil notification")
  11. func expectNotification(t *testing.T, ch <-chan *Notification, relname string, extra string) error {
  12. select {
  13. case n := <-ch:
  14. if n == nil {
  15. return errNilNotification
  16. }
  17. if n.Channel != relname || n.Extra != extra {
  18. return fmt.Errorf("unexpected notification %v", n)
  19. }
  20. return nil
  21. case <-time.After(1500 * time.Millisecond):
  22. return fmt.Errorf("timeout")
  23. }
  24. }
  25. func expectNoNotification(t *testing.T, ch <-chan *Notification) error {
  26. select {
  27. case n := <-ch:
  28. return fmt.Errorf("unexpected notification %v", n)
  29. case <-time.After(100 * time.Millisecond):
  30. return nil
  31. }
  32. }
  33. func expectEvent(t *testing.T, eventch <-chan ListenerEventType, et ListenerEventType) error {
  34. select {
  35. case e := <-eventch:
  36. if e != et {
  37. return fmt.Errorf("unexpected event %v", e)
  38. }
  39. return nil
  40. case <-time.After(1500 * time.Millisecond):
  41. return fmt.Errorf("timeout")
  42. }
  43. }
  44. func expectNoEvent(t *testing.T, eventch <-chan ListenerEventType) error {
  45. select {
  46. case e := <-eventch:
  47. return fmt.Errorf("unexpected event %v", e)
  48. case <-time.After(100 * time.Millisecond):
  49. return nil
  50. }
  51. }
  52. func newTestListenerConn(t *testing.T) (*ListenerConn, <-chan *Notification) {
  53. datname := os.Getenv("PGDATABASE")
  54. sslmode := os.Getenv("PGSSLMODE")
  55. if datname == "" {
  56. os.Setenv("PGDATABASE", "pqgotest")
  57. }
  58. if sslmode == "" {
  59. os.Setenv("PGSSLMODE", "disable")
  60. }
  61. notificationChan := make(chan *Notification)
  62. l, err := NewListenerConn("", notificationChan)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. return l, notificationChan
  67. }
  68. func TestNewListenerConn(t *testing.T) {
  69. l, _ := newTestListenerConn(t)
  70. defer l.Close()
  71. }
  72. func TestConnListen(t *testing.T) {
  73. l, channel := newTestListenerConn(t)
  74. defer l.Close()
  75. db := openTestConn(t)
  76. defer db.Close()
  77. ok, err := l.Listen("notify_test")
  78. if !ok || err != nil {
  79. t.Fatal(err)
  80. }
  81. _, err = db.Exec("NOTIFY notify_test")
  82. if err != nil {
  83. t.Fatal(err)
  84. }
  85. err = expectNotification(t, channel, "notify_test", "")
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. }
  90. func TestConnUnlisten(t *testing.T) {
  91. l, channel := newTestListenerConn(t)
  92. defer l.Close()
  93. db := openTestConn(t)
  94. defer db.Close()
  95. ok, err := l.Listen("notify_test")
  96. if !ok || err != nil {
  97. t.Fatal(err)
  98. }
  99. _, err = db.Exec("NOTIFY notify_test")
  100. err = expectNotification(t, channel, "notify_test", "")
  101. if err != nil {
  102. t.Fatal(err)
  103. }
  104. ok, err = l.Unlisten("notify_test")
  105. if !ok || err != nil {
  106. t.Fatal(err)
  107. }
  108. _, err = db.Exec("NOTIFY notify_test")
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. err = expectNoNotification(t, channel)
  113. if err != nil {
  114. t.Fatal(err)
  115. }
  116. }
  117. func TestConnUnlistenAll(t *testing.T) {
  118. l, channel := newTestListenerConn(t)
  119. defer l.Close()
  120. db := openTestConn(t)
  121. defer db.Close()
  122. ok, err := l.Listen("notify_test")
  123. if !ok || err != nil {
  124. t.Fatal(err)
  125. }
  126. _, err = db.Exec("NOTIFY notify_test")
  127. err = expectNotification(t, channel, "notify_test", "")
  128. if err != nil {
  129. t.Fatal(err)
  130. }
  131. ok, err = l.UnlistenAll()
  132. if !ok || err != nil {
  133. t.Fatal(err)
  134. }
  135. _, err = db.Exec("NOTIFY notify_test")
  136. if err != nil {
  137. t.Fatal(err)
  138. }
  139. err = expectNoNotification(t, channel)
  140. if err != nil {
  141. t.Fatal(err)
  142. }
  143. }
  144. func TestConnClose(t *testing.T) {
  145. l, _ := newTestListenerConn(t)
  146. defer l.Close()
  147. err := l.Close()
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. err = l.Close()
  152. if err != errListenerConnClosed {
  153. t.Fatalf("expected errListenerConnClosed; got %v", err)
  154. }
  155. }
  156. func TestConnPing(t *testing.T) {
  157. l, _ := newTestListenerConn(t)
  158. defer l.Close()
  159. err := l.Ping()
  160. if err != nil {
  161. t.Fatal(err)
  162. }
  163. err = l.Close()
  164. if err != nil {
  165. t.Fatal(err)
  166. }
  167. err = l.Ping()
  168. if err != errListenerConnClosed {
  169. t.Fatalf("expected errListenerConnClosed; got %v", err)
  170. }
  171. }
  172. func TestNotifyExtra(t *testing.T) {
  173. db := openTestConn(t)
  174. defer db.Close()
  175. if getServerVersion(t, db) < 90000 {
  176. t.Skip("skipping NOTIFY payload test since the server does not appear to support it")
  177. }
  178. l, channel := newTestListenerConn(t)
  179. defer l.Close()
  180. ok, err := l.Listen("notify_test")
  181. if !ok || err != nil {
  182. t.Fatal(err)
  183. }
  184. _, err = db.Exec("NOTIFY notify_test, 'something'")
  185. if err != nil {
  186. t.Fatal(err)
  187. }
  188. err = expectNotification(t, channel, "notify_test", "something")
  189. if err != nil {
  190. t.Fatal(err)
  191. }
  192. }
  193. // create a new test listener and also set the timeouts
  194. func newTestListenerTimeout(t *testing.T, min time.Duration, max time.Duration) (*Listener, <-chan ListenerEventType) {
  195. datname := os.Getenv("PGDATABASE")
  196. sslmode := os.Getenv("PGSSLMODE")
  197. if datname == "" {
  198. os.Setenv("PGDATABASE", "pqgotest")
  199. }
  200. if sslmode == "" {
  201. os.Setenv("PGSSLMODE", "disable")
  202. }
  203. eventch := make(chan ListenerEventType, 16)
  204. l := NewListener("", min, max, func(t ListenerEventType, err error) { eventch <- t })
  205. err := expectEvent(t, eventch, ListenerEventConnected)
  206. if err != nil {
  207. t.Fatal(err)
  208. }
  209. return l, eventch
  210. }
  211. func newTestListener(t *testing.T) (*Listener, <-chan ListenerEventType) {
  212. return newTestListenerTimeout(t, time.Hour, time.Hour)
  213. }
  214. func TestListenerListen(t *testing.T) {
  215. l, _ := newTestListener(t)
  216. defer l.Close()
  217. db := openTestConn(t)
  218. defer db.Close()
  219. err := l.Listen("notify_listen_test")
  220. if err != nil {
  221. t.Fatal(err)
  222. }
  223. _, err = db.Exec("NOTIFY notify_listen_test")
  224. if err != nil {
  225. t.Fatal(err)
  226. }
  227. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  228. if err != nil {
  229. t.Fatal(err)
  230. }
  231. }
  232. func TestListenerUnlisten(t *testing.T) {
  233. l, _ := newTestListener(t)
  234. defer l.Close()
  235. db := openTestConn(t)
  236. defer db.Close()
  237. err := l.Listen("notify_listen_test")
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. _, err = db.Exec("NOTIFY notify_listen_test")
  242. if err != nil {
  243. t.Fatal(err)
  244. }
  245. err = l.Unlisten("notify_listen_test")
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  250. if err != nil {
  251. t.Fatal(err)
  252. }
  253. _, err = db.Exec("NOTIFY notify_listen_test")
  254. if err != nil {
  255. t.Fatal(err)
  256. }
  257. err = expectNoNotification(t, l.Notify)
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. }
  262. func TestListenerUnlistenAll(t *testing.T) {
  263. l, _ := newTestListener(t)
  264. defer l.Close()
  265. db := openTestConn(t)
  266. defer db.Close()
  267. err := l.Listen("notify_listen_test")
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. _, err = db.Exec("NOTIFY notify_listen_test")
  272. if err != nil {
  273. t.Fatal(err)
  274. }
  275. err = l.UnlistenAll()
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  280. if err != nil {
  281. t.Fatal(err)
  282. }
  283. _, err = db.Exec("NOTIFY notify_listen_test")
  284. if err != nil {
  285. t.Fatal(err)
  286. }
  287. err = expectNoNotification(t, l.Notify)
  288. if err != nil {
  289. t.Fatal(err)
  290. }
  291. }
  292. func TestListenerFailedQuery(t *testing.T) {
  293. l, eventch := newTestListener(t)
  294. defer l.Close()
  295. db := openTestConn(t)
  296. defer db.Close()
  297. err := l.Listen("notify_listen_test")
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. _, err = db.Exec("NOTIFY notify_listen_test")
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. // shouldn't cause a disconnect
  310. ok, err := l.cn.ExecSimpleQuery("SELECT error")
  311. if !ok {
  312. t.Fatalf("could not send query to server: %v", err)
  313. }
  314. _, ok = err.(PGError)
  315. if !ok {
  316. t.Fatalf("unexpected error %v", err)
  317. }
  318. err = expectNoEvent(t, eventch)
  319. if err != nil {
  320. t.Fatal(err)
  321. }
  322. // should still work
  323. _, err = db.Exec("NOTIFY notify_listen_test")
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. }
  332. func TestListenerReconnect(t *testing.T) {
  333. l, eventch := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
  334. defer l.Close()
  335. db := openTestConn(t)
  336. defer db.Close()
  337. err := l.Listen("notify_listen_test")
  338. if err != nil {
  339. t.Fatal(err)
  340. }
  341. _, err = db.Exec("NOTIFY notify_listen_test")
  342. if err != nil {
  343. t.Fatal(err)
  344. }
  345. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  346. if err != nil {
  347. t.Fatal(err)
  348. }
  349. // kill the connection and make sure it comes back up
  350. ok, err := l.cn.ExecSimpleQuery("SELECT pg_terminate_backend(pg_backend_pid())")
  351. if ok {
  352. t.Fatalf("could not kill the connection: %v", err)
  353. }
  354. if err != io.EOF {
  355. t.Fatalf("unexpected error %v", err)
  356. }
  357. err = expectEvent(t, eventch, ListenerEventDisconnected)
  358. if err != nil {
  359. t.Fatal(err)
  360. }
  361. err = expectEvent(t, eventch, ListenerEventReconnected)
  362. if err != nil {
  363. t.Fatal(err)
  364. }
  365. // should still work
  366. _, err = db.Exec("NOTIFY notify_listen_test")
  367. if err != nil {
  368. t.Fatal(err)
  369. }
  370. // should get nil after Reconnected
  371. err = expectNotification(t, l.Notify, "", "")
  372. if err != errNilNotification {
  373. t.Fatal(err)
  374. }
  375. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  376. if err != nil {
  377. t.Fatal(err)
  378. }
  379. }
  380. func TestListenerClose(t *testing.T) {
  381. l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
  382. defer l.Close()
  383. err := l.Close()
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. err = l.Close()
  388. if err != errListenerClosed {
  389. t.Fatalf("expected errListenerClosed; got %v", err)
  390. }
  391. }
  392. func TestListenerPing(t *testing.T) {
  393. l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
  394. defer l.Close()
  395. err := l.Ping()
  396. if err != nil {
  397. t.Fatal(err)
  398. }
  399. err = l.Close()
  400. if err != nil {
  401. t.Fatal(err)
  402. }
  403. err = l.Ping()
  404. if err != errListenerClosed {
  405. t.Fatalf("expected errListenerClosed; got %v", err)
  406. }
  407. }