redis.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package redis // import "gopkg.in/redis.v5"
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "gopkg.in/redis.v5/internal"
  7. "gopkg.in/redis.v5/internal/pool"
  8. "gopkg.in/redis.v5/internal/proto"
  9. )
  10. // Redis nil reply, .e.g. when key does not exist.
  11. const Nil = internal.Nil
  12. func SetLogger(logger *log.Logger) {
  13. internal.Logger = logger
  14. }
  15. func (c *baseClient) String() string {
  16. return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
  17. }
  18. func (c *baseClient) conn() (*pool.Conn, bool, error) {
  19. cn, isNew, err := c.connPool.Get()
  20. if err != nil {
  21. return nil, false, err
  22. }
  23. if !cn.Inited {
  24. if err := c.initConn(cn); err != nil {
  25. _ = c.connPool.Remove(cn, err)
  26. return nil, false, err
  27. }
  28. }
  29. return cn, isNew, nil
  30. }
  31. func (c *baseClient) putConn(cn *pool.Conn, err error, allowTimeout bool) bool {
  32. if internal.IsBadConn(err, allowTimeout) {
  33. _ = c.connPool.Remove(cn, err)
  34. return false
  35. }
  36. _ = c.connPool.Put(cn)
  37. return true
  38. }
  39. func (c *baseClient) initConn(cn *pool.Conn) error {
  40. cn.Inited = true
  41. if c.opt.Password == "" && c.opt.DB == 0 && !c.opt.ReadOnly {
  42. return nil
  43. }
  44. // Temp client for Auth and Select.
  45. client := newClient(c.opt, pool.NewSingleConnPool(cn))
  46. _, err := client.Pipelined(func(pipe *Pipeline) error {
  47. if c.opt.Password != "" {
  48. pipe.Auth(c.opt.Password)
  49. }
  50. if c.opt.DB > 0 {
  51. pipe.Select(c.opt.DB)
  52. }
  53. if c.opt.ReadOnly {
  54. pipe.ReadOnly()
  55. }
  56. return nil
  57. })
  58. return err
  59. }
  60. func (c *baseClient) Process(cmd Cmder) error {
  61. if c.process != nil {
  62. return c.process(cmd)
  63. }
  64. return c.defaultProcess(cmd)
  65. }
  66. // WrapProcess replaces the process func. It takes a function createWrapper
  67. // which is supplied by the user. createWrapper takes the old process func as
  68. // an input and returns the new wrapper process func. createWrapper should
  69. // use call the old process func within the new process func.
  70. func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
  71. c.process = fn(c.defaultProcess)
  72. }
  73. func (c *baseClient) defaultProcess(cmd Cmder) error {
  74. for i := 0; i <= c.opt.MaxRetries; i++ {
  75. cn, _, err := c.conn()
  76. if err != nil {
  77. cmd.setErr(err)
  78. return err
  79. }
  80. cn.SetWriteTimeout(c.opt.WriteTimeout)
  81. if err := writeCmd(cn, cmd); err != nil {
  82. c.putConn(cn, err, false)
  83. cmd.setErr(err)
  84. if err != nil && internal.IsRetryableError(err) {
  85. continue
  86. }
  87. return err
  88. }
  89. cn.SetReadTimeout(c.cmdTimeout(cmd))
  90. err = cmd.readReply(cn)
  91. c.putConn(cn, err, false)
  92. if err != nil && internal.IsRetryableError(err) {
  93. continue
  94. }
  95. return err
  96. }
  97. return cmd.Err()
  98. }
  99. func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
  100. if timeout := cmd.readTimeout(); timeout != nil {
  101. return *timeout
  102. } else {
  103. return c.opt.ReadTimeout
  104. }
  105. }
  106. // Close closes the client, releasing any open resources.
  107. //
  108. // It is rare to Close a Client, as the Client is meant to be
  109. // long-lived and shared between many goroutines.
  110. func (c *baseClient) Close() error {
  111. var firstErr error
  112. if c.onClose != nil {
  113. if err := c.onClose(); err != nil && firstErr == nil {
  114. firstErr = err
  115. }
  116. }
  117. if err := c.connPool.Close(); err != nil && firstErr == nil {
  118. firstErr = err
  119. }
  120. return firstErr
  121. }
  122. func (c *baseClient) getAddr() string {
  123. return c.opt.Addr
  124. }
  125. type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
  126. func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
  127. return func(cmds []Cmder) error {
  128. var firstErr error
  129. for i := 0; i <= c.opt.MaxRetries; i++ {
  130. cn, _, err := c.conn()
  131. if err != nil {
  132. setCmdsErr(cmds, err)
  133. return err
  134. }
  135. canRetry, err := p(cn, cmds)
  136. c.putConn(cn, err, false)
  137. if err == nil {
  138. return nil
  139. }
  140. if firstErr == nil {
  141. firstErr = err
  142. }
  143. if !canRetry || !internal.IsRetryableError(err) {
  144. break
  145. }
  146. }
  147. return firstErr
  148. }
  149. }
  150. func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
  151. cn.SetWriteTimeout(c.opt.WriteTimeout)
  152. if err := writeCmd(cn, cmds...); err != nil {
  153. setCmdsErr(cmds, err)
  154. return true, err
  155. }
  156. // Set read timeout for all commands.
  157. cn.SetReadTimeout(c.opt.ReadTimeout)
  158. return pipelineReadCmds(cn, cmds)
  159. }
  160. func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
  161. for i, cmd := range cmds {
  162. err := cmd.readReply(cn)
  163. if err == nil {
  164. continue
  165. }
  166. if i == 0 {
  167. retry = true
  168. }
  169. if firstErr == nil {
  170. firstErr = err
  171. }
  172. }
  173. return false, firstErr
  174. }
  175. func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
  176. cn.SetWriteTimeout(c.opt.WriteTimeout)
  177. if err := txPipelineWriteMulti(cn, cmds); err != nil {
  178. setCmdsErr(cmds, err)
  179. return true, err
  180. }
  181. // Set read timeout for all commands.
  182. cn.SetReadTimeout(c.opt.ReadTimeout)
  183. if err := c.txPipelineReadQueued(cn, cmds); err != nil {
  184. return false, err
  185. }
  186. _, err := pipelineReadCmds(cn, cmds)
  187. return false, err
  188. }
  189. func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
  190. multiExec := make([]Cmder, 0, len(cmds)+2)
  191. multiExec = append(multiExec, NewStatusCmd("MULTI"))
  192. multiExec = append(multiExec, cmds...)
  193. multiExec = append(multiExec, NewSliceCmd("EXEC"))
  194. return writeCmd(cn, multiExec...)
  195. }
  196. func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
  197. var firstErr error
  198. // Parse queued replies.
  199. var statusCmd StatusCmd
  200. if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
  201. firstErr = err
  202. }
  203. for _, cmd := range cmds {
  204. err := statusCmd.readReply(cn)
  205. if err != nil {
  206. cmd.setErr(err)
  207. if firstErr == nil {
  208. firstErr = err
  209. }
  210. }
  211. }
  212. // Parse number of replies.
  213. line, err := cn.Rd.ReadLine()
  214. if err != nil {
  215. if err == Nil {
  216. err = TxFailedErr
  217. }
  218. return err
  219. }
  220. switch line[0] {
  221. case proto.ErrorReply:
  222. return proto.ParseErrorReply(line)
  223. case proto.ArrayReply:
  224. // ok
  225. default:
  226. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  227. return err
  228. }
  229. return nil
  230. }
  231. //------------------------------------------------------------------------------
  232. // Client is a Redis client representing a pool of zero or more
  233. // underlying connections. It's safe for concurrent use by multiple
  234. // goroutines.
  235. type Client struct {
  236. baseClient
  237. cmdable
  238. }
  239. func newClient(opt *Options, pool pool.Pooler) *Client {
  240. client := Client{
  241. baseClient: baseClient{
  242. opt: opt,
  243. connPool: pool,
  244. },
  245. }
  246. client.cmdable.process = client.Process
  247. return &client
  248. }
  249. // NewClient returns a client to the Redis Server specified by Options.
  250. func NewClient(opt *Options) *Client {
  251. opt.init()
  252. return newClient(opt, newConnPool(opt))
  253. }
  254. func (c *Client) copy() *Client {
  255. c2 := new(Client)
  256. *c2 = *c
  257. c2.cmdable.process = c2.Process
  258. return c2
  259. }
  260. // PoolStats returns connection pool stats.
  261. func (c *Client) PoolStats() *PoolStats {
  262. s := c.connPool.Stats()
  263. return &PoolStats{
  264. Requests: s.Requests,
  265. Hits: s.Hits,
  266. Timeouts: s.Timeouts,
  267. TotalConns: s.TotalConns,
  268. FreeConns: s.FreeConns,
  269. }
  270. }
  271. func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
  272. return c.Pipeline().pipelined(fn)
  273. }
  274. func (c *Client) Pipeline() *Pipeline {
  275. pipe := Pipeline{
  276. exec: c.pipelineExecer(c.pipelineProcessCmds),
  277. }
  278. pipe.cmdable.process = pipe.Process
  279. pipe.statefulCmdable.process = pipe.Process
  280. return &pipe
  281. }
  282. func (c *Client) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) {
  283. return c.TxPipeline().pipelined(fn)
  284. }
  285. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  286. func (c *Client) TxPipeline() *Pipeline {
  287. pipe := Pipeline{
  288. exec: c.pipelineExecer(c.txPipelineProcessCmds),
  289. }
  290. pipe.cmdable.process = pipe.Process
  291. pipe.statefulCmdable.process = pipe.Process
  292. return &pipe
  293. }
  294. func (c *Client) pubSub() *PubSub {
  295. return &PubSub{
  296. base: baseClient{
  297. opt: c.opt,
  298. connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
  299. },
  300. }
  301. }
  302. // Subscribe subscribes the client to the specified channels.
  303. func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
  304. pubsub := c.pubSub()
  305. if len(channels) > 0 {
  306. if err := pubsub.Subscribe(channels...); err != nil {
  307. pubsub.Close()
  308. return nil, err
  309. }
  310. }
  311. return pubsub, nil
  312. }
  313. // PSubscribe subscribes the client to the given patterns.
  314. func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
  315. pubsub := c.pubSub()
  316. if len(channels) > 0 {
  317. if err := pubsub.PSubscribe(channels...); err != nil {
  318. pubsub.Close()
  319. return nil, err
  320. }
  321. }
  322. return pubsub, nil
  323. }