conn.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package pool
  2. import (
  3. "net"
  4. "sync/atomic"
  5. "time"
  6. "gopkg.in/redis.v5/internal/proto"
  7. )
  8. var noDeadline = time.Time{}
  9. type Conn struct {
  10. netConn net.Conn
  11. Rd *proto.Reader
  12. Wb *proto.WriteBuffer
  13. Inited bool
  14. usedAt atomic.Value
  15. }
  16. func NewConn(netConn net.Conn) *Conn {
  17. cn := &Conn{
  18. netConn: netConn,
  19. Wb: proto.NewWriteBuffer(),
  20. }
  21. cn.Rd = proto.NewReader(cn.netConn)
  22. cn.SetUsedAt(time.Now())
  23. return cn
  24. }
  25. func (cn *Conn) UsedAt() time.Time {
  26. return cn.usedAt.Load().(time.Time)
  27. }
  28. func (cn *Conn) SetUsedAt(tm time.Time) {
  29. cn.usedAt.Store(tm)
  30. }
  31. func (cn *Conn) SetNetConn(netConn net.Conn) {
  32. cn.netConn = netConn
  33. cn.Rd.Reset(netConn)
  34. }
  35. func (cn *Conn) IsStale(timeout time.Duration) bool {
  36. return timeout > 0 && time.Since(cn.UsedAt()) > timeout
  37. }
  38. func (cn *Conn) SetReadTimeout(timeout time.Duration) error {
  39. now := time.Now()
  40. cn.SetUsedAt(now)
  41. if timeout > 0 {
  42. return cn.netConn.SetReadDeadline(now.Add(timeout))
  43. }
  44. return cn.netConn.SetReadDeadline(noDeadline)
  45. }
  46. func (cn *Conn) SetWriteTimeout(timeout time.Duration) error {
  47. now := time.Now()
  48. cn.SetUsedAt(now)
  49. if timeout > 0 {
  50. return cn.netConn.SetWriteDeadline(now.Add(timeout))
  51. }
  52. return cn.netConn.SetWriteDeadline(noDeadline)
  53. }
  54. func (cn *Conn) Write(b []byte) (int, error) {
  55. return cn.netConn.Write(b)
  56. }
  57. func (cn *Conn) RemoteAddr() net.Addr {
  58. return cn.netConn.RemoteAddr()
  59. }
  60. func (cn *Conn) Close() error {
  61. return cn.netConn.Close()
  62. }