cluster.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940
  1. package redis
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "gopkg.in/redis.v5/internal"
  9. "gopkg.in/redis.v5/internal/hashtag"
  10. "gopkg.in/redis.v5/internal/pool"
  11. "gopkg.in/redis.v5/internal/proto"
  12. )
  13. var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
  14. var errNilClusterState = internal.RedisError("redis: cannot load cluster slots")
  15. // ClusterOptions are used to configure a cluster client and should be
  16. // passed to NewClusterClient.
  17. type ClusterOptions struct {
  18. // A seed list of host:port addresses of cluster nodes.
  19. Addrs []string
  20. // The maximum number of retries before giving up. Command is retried
  21. // on network errors and MOVED/ASK redirects.
  22. // Default is 16.
  23. MaxRedirects int
  24. // Enables read queries for a connection to a Redis Cluster slave node.
  25. ReadOnly bool
  26. // Enables routing read-only queries to the closest master or slave node.
  27. RouteByLatency bool
  28. // Following options are copied from Options struct.
  29. Password string
  30. DialTimeout time.Duration
  31. ReadTimeout time.Duration
  32. WriteTimeout time.Duration
  33. // PoolSize applies per cluster node and not for the whole cluster.
  34. PoolSize int
  35. PoolTimeout time.Duration
  36. IdleTimeout time.Duration
  37. IdleCheckFrequency time.Duration
  38. }
  39. func (opt *ClusterOptions) init() {
  40. if opt.MaxRedirects == -1 {
  41. opt.MaxRedirects = 0
  42. } else if opt.MaxRedirects == 0 {
  43. opt.MaxRedirects = 16
  44. }
  45. if opt.RouteByLatency {
  46. opt.ReadOnly = true
  47. }
  48. }
  49. func (opt *ClusterOptions) clientOptions() *Options {
  50. const disableIdleCheck = -1
  51. return &Options{
  52. Password: opt.Password,
  53. ReadOnly: opt.ReadOnly,
  54. DialTimeout: opt.DialTimeout,
  55. ReadTimeout: opt.ReadTimeout,
  56. WriteTimeout: opt.WriteTimeout,
  57. PoolSize: opt.PoolSize,
  58. PoolTimeout: opt.PoolTimeout,
  59. IdleTimeout: opt.IdleTimeout,
  60. // IdleCheckFrequency is not copied to disable reaper
  61. IdleCheckFrequency: disableIdleCheck,
  62. }
  63. }
  64. //------------------------------------------------------------------------------
  65. type clusterNode struct {
  66. Client *Client
  67. Latency time.Duration
  68. loading time.Time
  69. }
  70. func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
  71. opt := clOpt.clientOptions()
  72. opt.Addr = addr
  73. node := clusterNode{
  74. Client: NewClient(opt),
  75. }
  76. if clOpt.RouteByLatency {
  77. node.updateLatency()
  78. }
  79. return &node
  80. }
  81. func (n *clusterNode) updateLatency() {
  82. const probes = 10
  83. for i := 0; i < probes; i++ {
  84. start := time.Now()
  85. n.Client.Ping()
  86. n.Latency += time.Since(start)
  87. }
  88. n.Latency = n.Latency / probes
  89. }
  90. func (n *clusterNode) Loading() bool {
  91. return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
  92. }
  93. //------------------------------------------------------------------------------
  94. type clusterNodes struct {
  95. opt *ClusterOptions
  96. mu sync.RWMutex
  97. addrs []string
  98. nodes map[string]*clusterNode
  99. closed bool
  100. }
  101. func newClusterNodes(opt *ClusterOptions) *clusterNodes {
  102. return &clusterNodes{
  103. opt: opt,
  104. nodes: make(map[string]*clusterNode),
  105. }
  106. }
  107. func (c *clusterNodes) Close() error {
  108. c.mu.Lock()
  109. defer c.mu.Unlock()
  110. if c.closed {
  111. return nil
  112. }
  113. c.closed = true
  114. var firstErr error
  115. for _, node := range c.nodes {
  116. if err := node.Client.Close(); err != nil && firstErr == nil {
  117. firstErr = err
  118. }
  119. }
  120. c.addrs = nil
  121. c.nodes = nil
  122. return firstErr
  123. }
  124. func (c *clusterNodes) All() ([]*clusterNode, error) {
  125. c.mu.RLock()
  126. defer c.mu.RUnlock()
  127. if c.closed {
  128. return nil, pool.ErrClosed
  129. }
  130. nodes := make([]*clusterNode, 0, len(c.nodes))
  131. for _, node := range c.nodes {
  132. nodes = append(nodes, node)
  133. }
  134. return nodes, nil
  135. }
  136. func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
  137. var node *clusterNode
  138. var ok bool
  139. c.mu.RLock()
  140. if !c.closed {
  141. node, ok = c.nodes[addr]
  142. }
  143. c.mu.RUnlock()
  144. if ok {
  145. return node, nil
  146. }
  147. c.mu.Lock()
  148. defer c.mu.Unlock()
  149. if c.closed {
  150. return nil, pool.ErrClosed
  151. }
  152. node, ok = c.nodes[addr]
  153. if ok {
  154. return node, nil
  155. }
  156. c.addrs = append(c.addrs, addr)
  157. node = newClusterNode(c.opt, addr)
  158. c.nodes[addr] = node
  159. return node, nil
  160. }
  161. func (c *clusterNodes) Random() (*clusterNode, error) {
  162. c.mu.RLock()
  163. closed := c.closed
  164. addrs := c.addrs
  165. c.mu.RUnlock()
  166. if closed {
  167. return nil, pool.ErrClosed
  168. }
  169. if len(addrs) == 0 {
  170. return nil, errClusterNoNodes
  171. }
  172. var nodeErr error
  173. for i := 0; i <= c.opt.MaxRedirects; i++ {
  174. n := rand.Intn(len(addrs))
  175. node, err := c.Get(addrs[n])
  176. if err != nil {
  177. return nil, err
  178. }
  179. nodeErr = node.Client.ClusterInfo().Err()
  180. if nodeErr == nil {
  181. return node, nil
  182. }
  183. }
  184. return nil, nodeErr
  185. }
  186. //------------------------------------------------------------------------------
  187. type clusterState struct {
  188. nodes *clusterNodes
  189. slots [][]*clusterNode
  190. }
  191. func newClusterState(nodes *clusterNodes, slots []ClusterSlot) (*clusterState, error) {
  192. c := clusterState{
  193. nodes: nodes,
  194. slots: make([][]*clusterNode, hashtag.SlotNumber),
  195. }
  196. for _, slot := range slots {
  197. var nodes []*clusterNode
  198. for _, slotNode := range slot.Nodes {
  199. node, err := c.nodes.Get(slotNode.Addr)
  200. if err != nil {
  201. return nil, err
  202. }
  203. nodes = append(nodes, node)
  204. }
  205. for i := slot.Start; i <= slot.End; i++ {
  206. c.slots[i] = nodes
  207. }
  208. }
  209. return &c, nil
  210. }
  211. func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
  212. nodes := c.slotNodes(slot)
  213. if len(nodes) > 0 {
  214. return nodes[0], nil
  215. }
  216. return c.nodes.Random()
  217. }
  218. func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
  219. nodes := c.slotNodes(slot)
  220. switch len(nodes) {
  221. case 0:
  222. return c.nodes.Random()
  223. case 1:
  224. return nodes[0], nil
  225. case 2:
  226. if slave := nodes[1]; !slave.Loading() {
  227. return slave, nil
  228. }
  229. return nodes[0], nil
  230. default:
  231. var slave *clusterNode
  232. for i := 0; i < 10; i++ {
  233. n := rand.Intn(len(nodes)-1) + 1
  234. slave = nodes[n]
  235. if !slave.Loading() {
  236. break
  237. }
  238. }
  239. return slave, nil
  240. }
  241. }
  242. func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
  243. const threshold = time.Millisecond
  244. nodes := c.slotNodes(slot)
  245. if len(nodes) == 0 {
  246. return c.nodes.Random()
  247. }
  248. var node *clusterNode
  249. for _, n := range nodes {
  250. if n.Loading() {
  251. continue
  252. }
  253. if node == nil || node.Latency-n.Latency > threshold {
  254. node = n
  255. }
  256. }
  257. return node, nil
  258. }
  259. func (c *clusterState) slotNodes(slot int) []*clusterNode {
  260. if slot < len(c.slots) {
  261. return c.slots[slot]
  262. }
  263. return nil
  264. }
  265. //------------------------------------------------------------------------------
  266. // ClusterClient is a Redis Cluster client representing a pool of zero
  267. // or more underlying connections. It's safe for concurrent use by
  268. // multiple goroutines.
  269. type ClusterClient struct {
  270. cmdable
  271. opt *ClusterOptions
  272. cmds map[string]*CommandInfo
  273. nodes *clusterNodes
  274. _state atomic.Value
  275. // Reports where slots reloading is in progress.
  276. reloading uint32
  277. closed bool
  278. }
  279. // NewClusterClient returns a Redis Cluster client as described in
  280. // http://redis.io/topics/cluster-spec.
  281. func NewClusterClient(opt *ClusterOptions) *ClusterClient {
  282. opt.init()
  283. c := &ClusterClient{
  284. opt: opt,
  285. nodes: newClusterNodes(opt),
  286. }
  287. c.cmdable.process = c.Process
  288. // Add initial nodes.
  289. for _, addr := range opt.Addrs {
  290. _, _ = c.nodes.Get(addr)
  291. }
  292. // Preload cluster slots.
  293. for i := 0; i < 10; i++ {
  294. state, err := c.reloadSlots()
  295. if err == nil {
  296. c._state.Store(state)
  297. break
  298. }
  299. }
  300. if opt.IdleCheckFrequency > 0 {
  301. go c.reaper(opt.IdleCheckFrequency)
  302. }
  303. return c
  304. }
  305. func (c *ClusterClient) state() *clusterState {
  306. v := c._state.Load()
  307. if v != nil {
  308. return v.(*clusterState)
  309. }
  310. c.lazyReloadSlots()
  311. return nil
  312. }
  313. func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
  314. if state == nil {
  315. node, err := c.nodes.Random()
  316. return 0, node, err
  317. }
  318. cmdInfo := c.cmds[cmd.name()]
  319. firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
  320. slot := hashtag.Slot(firstKey)
  321. if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
  322. if c.opt.RouteByLatency {
  323. node, err := state.slotClosestNode(slot)
  324. return slot, node, err
  325. }
  326. node, err := state.slotSlaveNode(slot)
  327. return slot, node, err
  328. }
  329. node, err := state.slotMasterNode(slot)
  330. return slot, node, err
  331. }
  332. func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
  333. state := c.state()
  334. var node *clusterNode
  335. var err error
  336. if state != nil && len(keys) > 0 {
  337. node, err = state.slotMasterNode(hashtag.Slot(keys[0]))
  338. } else {
  339. node, err = c.nodes.Random()
  340. }
  341. if err != nil {
  342. return err
  343. }
  344. return node.Client.Watch(fn, keys...)
  345. }
  346. // Close closes the cluster client, releasing any open resources.
  347. //
  348. // It is rare to Close a ClusterClient, as the ClusterClient is meant
  349. // to be long-lived and shared between many goroutines.
  350. func (c *ClusterClient) Close() error {
  351. return c.nodes.Close()
  352. }
  353. func (c *ClusterClient) Process(cmd Cmder) error {
  354. slot, node, err := c.cmdSlotAndNode(c.state(), cmd)
  355. if err != nil {
  356. cmd.setErr(err)
  357. return err
  358. }
  359. var ask bool
  360. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  361. if ask {
  362. pipe := node.Client.Pipeline()
  363. pipe.Process(NewCmd("ASKING"))
  364. pipe.Process(cmd)
  365. _, err = pipe.Exec()
  366. pipe.Close()
  367. ask = false
  368. } else {
  369. err = node.Client.Process(cmd)
  370. }
  371. // If there is no (real) error - we are done.
  372. if err == nil {
  373. return nil
  374. }
  375. // If slave is loading - read from master.
  376. if c.opt.ReadOnly && internal.IsLoadingError(err) {
  377. node.loading = time.Now()
  378. continue
  379. }
  380. // On network errors try random node.
  381. if internal.IsRetryableError(err) {
  382. node, err = c.nodes.Random()
  383. if err != nil {
  384. cmd.setErr(err)
  385. return err
  386. }
  387. continue
  388. }
  389. var moved bool
  390. var addr string
  391. moved, ask, addr = internal.IsMovedError(err)
  392. if moved || ask {
  393. state := c.state()
  394. if state != nil && slot >= 0 {
  395. master, _ := state.slotMasterNode(slot)
  396. if moved && (master == nil || master.Client.getAddr() != addr) {
  397. c.lazyReloadSlots()
  398. }
  399. }
  400. node, err = c.nodes.Get(addr)
  401. if err != nil {
  402. cmd.setErr(err)
  403. return err
  404. }
  405. continue
  406. }
  407. break
  408. }
  409. return cmd.Err()
  410. }
  411. // ForEachNode concurrently calls the fn on each ever known node in the cluster.
  412. // It returns the first error if any.
  413. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
  414. nodes, err := c.nodes.All()
  415. if err != nil {
  416. return err
  417. }
  418. var wg sync.WaitGroup
  419. errCh := make(chan error, 1)
  420. for _, node := range nodes {
  421. wg.Add(1)
  422. go func(node *clusterNode) {
  423. defer wg.Done()
  424. err := fn(node.Client)
  425. if err != nil {
  426. select {
  427. case errCh <- err:
  428. default:
  429. }
  430. }
  431. }(node)
  432. }
  433. wg.Wait()
  434. select {
  435. case err := <-errCh:
  436. return err
  437. default:
  438. return nil
  439. }
  440. }
  441. // ForEachMaster concurrently calls the fn on each master node in the cluster.
  442. // It returns the first error if any.
  443. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
  444. state := c.state()
  445. if state == nil {
  446. return errNilClusterState
  447. }
  448. var wg sync.WaitGroup
  449. visited := make(map[*clusterNode]struct{})
  450. errCh := make(chan error, 1)
  451. for _, nodes := range state.slots {
  452. if len(nodes) == 0 {
  453. continue
  454. }
  455. master := nodes[0]
  456. if _, ok := visited[master]; ok {
  457. continue
  458. }
  459. visited[master] = struct{}{}
  460. wg.Add(1)
  461. go func(node *clusterNode) {
  462. defer wg.Done()
  463. err := fn(node.Client)
  464. if err != nil {
  465. select {
  466. case errCh <- err:
  467. default:
  468. }
  469. }
  470. }(master)
  471. }
  472. wg.Wait()
  473. select {
  474. case err := <-errCh:
  475. return err
  476. default:
  477. return nil
  478. }
  479. }
  480. // PoolStats returns accumulated connection pool stats.
  481. func (c *ClusterClient) PoolStats() *PoolStats {
  482. var acc PoolStats
  483. nodes, err := c.nodes.All()
  484. if err != nil {
  485. return &acc
  486. }
  487. for _, node := range nodes {
  488. s := node.Client.connPool.Stats()
  489. acc.Requests += s.Requests
  490. acc.Hits += s.Hits
  491. acc.Timeouts += s.Timeouts
  492. acc.TotalConns += s.TotalConns
  493. acc.FreeConns += s.FreeConns
  494. }
  495. return &acc
  496. }
  497. func (c *ClusterClient) lazyReloadSlots() {
  498. if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
  499. return
  500. }
  501. go func() {
  502. for i := 0; i < 1000; i++ {
  503. state, err := c.reloadSlots()
  504. if err == pool.ErrClosed {
  505. break
  506. }
  507. if err == nil {
  508. c._state.Store(state)
  509. break
  510. }
  511. time.Sleep(time.Millisecond)
  512. }
  513. time.Sleep(3 * time.Second)
  514. atomic.StoreUint32(&c.reloading, 0)
  515. }()
  516. }
  517. func (c *ClusterClient) reloadSlots() (*clusterState, error) {
  518. node, err := c.nodes.Random()
  519. if err != nil {
  520. return nil, err
  521. }
  522. // TODO: fix race
  523. if c.cmds == nil {
  524. cmds, err := node.Client.Command().Result()
  525. if err != nil {
  526. return nil, err
  527. }
  528. c.cmds = cmds
  529. }
  530. slots, err := node.Client.ClusterSlots().Result()
  531. if err != nil {
  532. return nil, err
  533. }
  534. return newClusterState(c.nodes, slots)
  535. }
  536. // reaper closes idle connections to the cluster.
  537. func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
  538. ticker := time.NewTicker(idleCheckFrequency)
  539. defer ticker.Stop()
  540. for _ = range ticker.C {
  541. nodes, err := c.nodes.All()
  542. if err != nil {
  543. break
  544. }
  545. var n int
  546. for _, node := range nodes {
  547. nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
  548. if err != nil {
  549. internal.Logf("ReapStaleConns failed: %s", err)
  550. } else {
  551. n += nn
  552. }
  553. }
  554. s := c.PoolStats()
  555. internal.Logf(
  556. "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
  557. n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
  558. )
  559. }
  560. }
  561. func (c *ClusterClient) Pipeline() *Pipeline {
  562. pipe := Pipeline{
  563. exec: c.pipelineExec,
  564. }
  565. pipe.cmdable.process = pipe.Process
  566. pipe.statefulCmdable.process = pipe.Process
  567. return &pipe
  568. }
  569. func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
  570. return c.Pipeline().pipelined(fn)
  571. }
  572. func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
  573. cmdsMap, err := c.mapCmdsByNode(cmds)
  574. if err != nil {
  575. return err
  576. }
  577. for i := 0; i <= c.opt.MaxRedirects; i++ {
  578. failedCmds := make(map[*clusterNode][]Cmder)
  579. for node, cmds := range cmdsMap {
  580. cn, _, err := node.Client.conn()
  581. if err != nil {
  582. setCmdsErr(cmds, err)
  583. continue
  584. }
  585. err = c.pipelineProcessCmds(cn, cmds, failedCmds)
  586. node.Client.putConn(cn, err, false)
  587. }
  588. if len(failedCmds) == 0 {
  589. break
  590. }
  591. cmdsMap = failedCmds
  592. }
  593. var firstErr error
  594. for _, cmd := range cmds {
  595. if err := cmd.Err(); err != nil {
  596. firstErr = err
  597. break
  598. }
  599. }
  600. return firstErr
  601. }
  602. func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
  603. state := c.state()
  604. cmdsMap := make(map[*clusterNode][]Cmder)
  605. for _, cmd := range cmds {
  606. _, node, err := c.cmdSlotAndNode(state, cmd)
  607. if err != nil {
  608. return nil, err
  609. }
  610. cmdsMap[node] = append(cmdsMap[node], cmd)
  611. }
  612. return cmdsMap, nil
  613. }
  614. func (c *ClusterClient) pipelineProcessCmds(
  615. cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  616. ) error {
  617. cn.SetWriteTimeout(c.opt.WriteTimeout)
  618. if err := writeCmd(cn, cmds...); err != nil {
  619. setCmdsErr(cmds, err)
  620. return err
  621. }
  622. // Set read timeout for all commands.
  623. cn.SetReadTimeout(c.opt.ReadTimeout)
  624. return c.pipelineReadCmds(cn, cmds, failedCmds)
  625. }
  626. func (c *ClusterClient) pipelineReadCmds(
  627. cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  628. ) error {
  629. var firstErr error
  630. for _, cmd := range cmds {
  631. err := cmd.readReply(cn)
  632. if err == nil {
  633. continue
  634. }
  635. if firstErr == nil {
  636. firstErr = err
  637. }
  638. err = c.checkMovedErr(cmd, failedCmds)
  639. if err != nil && firstErr == nil {
  640. firstErr = err
  641. }
  642. }
  643. return firstErr
  644. }
  645. func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
  646. moved, ask, addr := internal.IsMovedError(cmd.Err())
  647. if moved {
  648. c.lazyReloadSlots()
  649. node, err := c.nodes.Get(addr)
  650. if err != nil {
  651. return err
  652. }
  653. failedCmds[node] = append(failedCmds[node], cmd)
  654. }
  655. if ask {
  656. node, err := c.nodes.Get(addr)
  657. if err != nil {
  658. return err
  659. }
  660. failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
  661. }
  662. return nil
  663. }
  664. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  665. func (c *ClusterClient) TxPipeline() *Pipeline {
  666. pipe := Pipeline{
  667. exec: c.txPipelineExec,
  668. }
  669. pipe.cmdable.process = pipe.Process
  670. pipe.statefulCmdable.process = pipe.Process
  671. return &pipe
  672. }
  673. func (c *ClusterClient) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) {
  674. return c.Pipeline().pipelined(fn)
  675. }
  676. func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
  677. cmdsMap, err := c.mapCmdsBySlot(cmds)
  678. if err != nil {
  679. return err
  680. }
  681. state := c.state()
  682. if state == nil {
  683. return errNilClusterState
  684. }
  685. for slot, cmds := range cmdsMap {
  686. node, err := state.slotMasterNode(slot)
  687. if err != nil {
  688. setCmdsErr(cmds, err)
  689. continue
  690. }
  691. cmdsMap := map[*clusterNode][]Cmder{node: cmds}
  692. for i := 0; i <= c.opt.MaxRedirects; i++ {
  693. failedCmds := make(map[*clusterNode][]Cmder)
  694. for node, cmds := range cmdsMap {
  695. cn, _, err := node.Client.conn()
  696. if err != nil {
  697. setCmdsErr(cmds, err)
  698. continue
  699. }
  700. err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
  701. node.Client.putConn(cn, err, false)
  702. }
  703. if len(failedCmds) == 0 {
  704. break
  705. }
  706. cmdsMap = failedCmds
  707. }
  708. }
  709. var firstErr error
  710. for _, cmd := range cmds {
  711. if err := cmd.Err(); err != nil {
  712. firstErr = err
  713. break
  714. }
  715. }
  716. return firstErr
  717. }
  718. func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) {
  719. state := c.state()
  720. cmdsMap := make(map[int][]Cmder)
  721. for _, cmd := range cmds {
  722. slot, _, err := c.cmdSlotAndNode(state, cmd)
  723. if err != nil {
  724. return nil, err
  725. }
  726. cmdsMap[slot] = append(cmdsMap[slot], cmd)
  727. }
  728. return cmdsMap, nil
  729. }
  730. func (c *ClusterClient) txPipelineProcessCmds(
  731. node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  732. ) error {
  733. cn.SetWriteTimeout(c.opt.WriteTimeout)
  734. if err := txPipelineWriteMulti(cn, cmds); err != nil {
  735. setCmdsErr(cmds, err)
  736. failedCmds[node] = cmds
  737. return err
  738. }
  739. // Set read timeout for all commands.
  740. cn.SetReadTimeout(c.opt.ReadTimeout)
  741. if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
  742. return err
  743. }
  744. _, err := pipelineReadCmds(cn, cmds)
  745. return err
  746. }
  747. func (c *ClusterClient) txPipelineReadQueued(
  748. cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  749. ) error {
  750. var firstErr error
  751. // Parse queued replies.
  752. var statusCmd StatusCmd
  753. if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
  754. firstErr = err
  755. }
  756. for _, cmd := range cmds {
  757. err := statusCmd.readReply(cn)
  758. if err == nil {
  759. continue
  760. }
  761. cmd.setErr(err)
  762. if firstErr == nil {
  763. firstErr = err
  764. }
  765. err = c.checkMovedErr(cmd, failedCmds)
  766. if err != nil && firstErr == nil {
  767. firstErr = err
  768. }
  769. }
  770. // Parse number of replies.
  771. line, err := cn.Rd.ReadLine()
  772. if err != nil {
  773. if err == Nil {
  774. err = TxFailedErr
  775. }
  776. return err
  777. }
  778. switch line[0] {
  779. case proto.ErrorReply:
  780. return proto.ParseErrorReply(line)
  781. case proto.ArrayReply:
  782. // ok
  783. default:
  784. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  785. return err
  786. }
  787. return firstErr
  788. }