reader.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package proto
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "strconv"
  7. "gopkg.in/redis.v5/internal"
  8. )
  9. const bytesAllocLimit = 1024 * 1024 // 1mb
  10. const (
  11. ErrorReply = '-'
  12. StatusReply = '+'
  13. IntReply = ':'
  14. StringReply = '$'
  15. ArrayReply = '*'
  16. )
  17. type MultiBulkParse func(*Reader, int64) (interface{}, error)
  18. type Reader struct {
  19. src *bufio.Reader
  20. buf []byte
  21. }
  22. func NewReader(rd io.Reader) *Reader {
  23. return &Reader{
  24. src: bufio.NewReader(rd),
  25. buf: make([]byte, 4096),
  26. }
  27. }
  28. func (r *Reader) Reset(rd io.Reader) {
  29. r.src.Reset(rd)
  30. }
  31. func (p *Reader) PeekBuffered() []byte {
  32. if n := p.src.Buffered(); n != 0 {
  33. b, _ := p.src.Peek(n)
  34. return b
  35. }
  36. return nil
  37. }
  38. func (p *Reader) ReadN(n int) ([]byte, error) {
  39. b, err := readN(p.src, p.buf, n)
  40. if err != nil {
  41. return nil, err
  42. }
  43. p.buf = b
  44. return b, nil
  45. }
  46. func (p *Reader) ReadLine() ([]byte, error) {
  47. line, isPrefix, err := p.src.ReadLine()
  48. if err != nil {
  49. return nil, err
  50. }
  51. if isPrefix {
  52. return nil, bufio.ErrBufferFull
  53. }
  54. if len(line) == 0 {
  55. return nil, internal.RedisError("redis: reply is empty")
  56. }
  57. if isNilReply(line) {
  58. return nil, internal.Nil
  59. }
  60. return line, nil
  61. }
  62. func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
  63. line, err := p.ReadLine()
  64. if err != nil {
  65. return nil, err
  66. }
  67. switch line[0] {
  68. case ErrorReply:
  69. return nil, ParseErrorReply(line)
  70. case StatusReply:
  71. return parseStatusValue(line), nil
  72. case IntReply:
  73. return parseInt(line[1:], 10, 64)
  74. case StringReply:
  75. return p.readTmpBytesValue(line)
  76. case ArrayReply:
  77. n, err := parseArrayLen(line)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return m(p, n)
  82. }
  83. return nil, fmt.Errorf("redis: can't parse %.100q", line)
  84. }
  85. func (p *Reader) ReadIntReply() (int64, error) {
  86. line, err := p.ReadLine()
  87. if err != nil {
  88. return 0, err
  89. }
  90. switch line[0] {
  91. case ErrorReply:
  92. return 0, ParseErrorReply(line)
  93. case IntReply:
  94. return parseInt(line[1:], 10, 64)
  95. default:
  96. return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
  97. }
  98. }
  99. func (p *Reader) ReadTmpBytesReply() ([]byte, error) {
  100. line, err := p.ReadLine()
  101. if err != nil {
  102. return nil, err
  103. }
  104. switch line[0] {
  105. case ErrorReply:
  106. return nil, ParseErrorReply(line)
  107. case StringReply:
  108. return p.readTmpBytesValue(line)
  109. case StatusReply:
  110. return parseStatusValue(line), nil
  111. default:
  112. return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
  113. }
  114. }
  115. func (r *Reader) ReadBytesReply() ([]byte, error) {
  116. b, err := r.ReadTmpBytesReply()
  117. if err != nil {
  118. return nil, err
  119. }
  120. cp := make([]byte, len(b))
  121. copy(cp, b)
  122. return cp, nil
  123. }
  124. func (p *Reader) ReadStringReply() (string, error) {
  125. b, err := p.ReadTmpBytesReply()
  126. if err != nil {
  127. return "", err
  128. }
  129. return string(b), nil
  130. }
  131. func (p *Reader) ReadFloatReply() (float64, error) {
  132. b, err := p.ReadTmpBytesReply()
  133. if err != nil {
  134. return 0, err
  135. }
  136. return parseFloat(b, 64)
  137. }
  138. func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
  139. line, err := p.ReadLine()
  140. if err != nil {
  141. return nil, err
  142. }
  143. switch line[0] {
  144. case ErrorReply:
  145. return nil, ParseErrorReply(line)
  146. case ArrayReply:
  147. n, err := parseArrayLen(line)
  148. if err != nil {
  149. return nil, err
  150. }
  151. return m(p, n)
  152. default:
  153. return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
  154. }
  155. }
  156. func (p *Reader) ReadArrayLen() (int64, error) {
  157. line, err := p.ReadLine()
  158. if err != nil {
  159. return 0, err
  160. }
  161. switch line[0] {
  162. case ErrorReply:
  163. return 0, ParseErrorReply(line)
  164. case ArrayReply:
  165. return parseArrayLen(line)
  166. default:
  167. return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
  168. }
  169. }
  170. func (p *Reader) ReadScanReply() ([]string, uint64, error) {
  171. n, err := p.ReadArrayLen()
  172. if err != nil {
  173. return nil, 0, err
  174. }
  175. if n != 2 {
  176. return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
  177. }
  178. cursor, err := p.ReadUint()
  179. if err != nil {
  180. return nil, 0, err
  181. }
  182. n, err = p.ReadArrayLen()
  183. if err != nil {
  184. return nil, 0, err
  185. }
  186. keys := make([]string, n)
  187. for i := int64(0); i < n; i++ {
  188. key, err := p.ReadStringReply()
  189. if err != nil {
  190. return nil, 0, err
  191. }
  192. keys[i] = key
  193. }
  194. return keys, cursor, err
  195. }
  196. func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) {
  197. if isNilReply(line) {
  198. return nil, internal.Nil
  199. }
  200. replyLen, err := strconv.Atoi(string(line[1:]))
  201. if err != nil {
  202. return nil, err
  203. }
  204. b, err := p.ReadN(replyLen + 2)
  205. if err != nil {
  206. return nil, err
  207. }
  208. return b[:replyLen], nil
  209. }
  210. func (r *Reader) ReadInt() (int64, error) {
  211. b, err := r.ReadTmpBytesReply()
  212. if err != nil {
  213. return 0, err
  214. }
  215. return parseInt(b, 10, 64)
  216. }
  217. func (r *Reader) ReadUint() (uint64, error) {
  218. b, err := r.ReadTmpBytesReply()
  219. if err != nil {
  220. return 0, err
  221. }
  222. return parseUint(b, 10, 64)
  223. }
  224. // --------------------------------------------------------------------
  225. func readN(r io.Reader, b []byte, n int) ([]byte, error) {
  226. if n == 0 && b == nil {
  227. return make([]byte, 0), nil
  228. }
  229. if cap(b) >= n {
  230. b = b[:n]
  231. _, err := io.ReadFull(r, b)
  232. return b, err
  233. }
  234. b = b[:cap(b)]
  235. pos := 0
  236. for pos < n {
  237. diff := n - len(b)
  238. if diff > bytesAllocLimit {
  239. diff = bytesAllocLimit
  240. }
  241. b = append(b, make([]byte, diff)...)
  242. nn, err := io.ReadFull(r, b[pos:])
  243. if err != nil {
  244. return nil, err
  245. }
  246. pos += nn
  247. }
  248. return b, nil
  249. }
  250. func formatInt(n int64) string {
  251. return strconv.FormatInt(n, 10)
  252. }
  253. func formatUint(u uint64) string {
  254. return strconv.FormatUint(u, 10)
  255. }
  256. func formatFloat(f float64) string {
  257. return strconv.FormatFloat(f, 'f', -1, 64)
  258. }
  259. func isNilReply(b []byte) bool {
  260. return len(b) == 3 &&
  261. (b[0] == StringReply || b[0] == ArrayReply) &&
  262. b[1] == '-' && b[2] == '1'
  263. }
  264. func ParseErrorReply(line []byte) error {
  265. return internal.RedisError(string(line[1:]))
  266. }
  267. func parseStatusValue(line []byte) []byte {
  268. return line[1:]
  269. }
  270. func parseArrayLen(line []byte) (int64, error) {
  271. if isNilReply(line) {
  272. return 0, internal.Nil
  273. }
  274. return parseInt(line[1:], 10, 64)
  275. }
  276. func atoi(b []byte) (int, error) {
  277. return strconv.Atoi(internal.BytesToString(b))
  278. }
  279. func parseInt(b []byte, base int, bitSize int) (int64, error) {
  280. return strconv.ParseInt(internal.BytesToString(b), base, bitSize)
  281. }
  282. func parseUint(b []byte, base int, bitSize int) (uint64, error) {
  283. return strconv.ParseUint(internal.BytesToString(b), base, bitSize)
  284. }
  285. func parseFloat(b []byte, bitSize int) (float64, error) {
  286. return strconv.ParseFloat(internal.BytesToString(b), bitSize)
  287. }