read.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "bytes"
  8. "encoding/binary"
  9. "io"
  10. "time"
  11. )
  12. /*
  13. Reads a frame from an input stream and returns an interface that can be cast into
  14. one of the following:
  15. methodFrame
  16. PropertiesFrame
  17. bodyFrame
  18. heartbeatFrame
  19. 2.3.5 frame Details
  20. All frames consist of a header (7 octets), a payload of arbitrary size, and a
  21. 'frame-end' octet that detects malformed frames:
  22. 0 1 3 7 size+7 size+8
  23. +------+---------+-------------+ +------------+ +-----------+
  24. | type | channel | size | | payload | | frame-end |
  25. +------+---------+-------------+ +------------+ +-----------+
  26. octet short long size octets octet
  27. To read a frame, we:
  28. 1. Read the header and check the frame type and channel.
  29. 2. Depending on the frame type, we read the payload and process it.
  30. 3. Read the frame end octet.
  31. In realistic implementations where performance is a concern, we would use
  32. “read-ahead buffering” or
  33. “gathering reads” to avoid doing three separate system calls to read a frame.
  34. */
  35. func (me *reader) ReadFrame() (frame frame, err error) {
  36. var scratch [7]byte
  37. if _, err = io.ReadFull(me.r, scratch[:7]); err != nil {
  38. return
  39. }
  40. typ := uint8(scratch[0])
  41. channel := binary.BigEndian.Uint16(scratch[1:3])
  42. size := binary.BigEndian.Uint32(scratch[3:7])
  43. switch typ {
  44. case frameMethod:
  45. if frame, err = me.parseMethodFrame(channel, size); err != nil {
  46. return
  47. }
  48. case frameHeader:
  49. if frame, err = me.parseHeaderFrame(channel, size); err != nil {
  50. return
  51. }
  52. case frameBody:
  53. if frame, err = me.parseBodyFrame(channel, size); err != nil {
  54. return
  55. }
  56. case frameHeartbeat:
  57. if frame, err = me.parseHeartbeatFrame(channel, size); err != nil {
  58. return
  59. }
  60. default:
  61. return nil, ErrFrame
  62. }
  63. if _, err = io.ReadFull(me.r, scratch[:1]); err != nil {
  64. return
  65. }
  66. if scratch[0] != frameEnd {
  67. return nil, ErrFrame
  68. }
  69. return
  70. }
  71. func readShortstr(r io.Reader) (v string, err error) {
  72. var length uint8
  73. if err = binary.Read(r, binary.BigEndian, &length); err != nil {
  74. return
  75. }
  76. bytes := make([]byte, length)
  77. if _, err = io.ReadFull(r, bytes); err != nil {
  78. return
  79. }
  80. return string(bytes), nil
  81. }
  82. func readLongstr(r io.Reader) (v string, err error) {
  83. var length uint32
  84. if err = binary.Read(r, binary.BigEndian, &length); err != nil {
  85. return
  86. }
  87. bytes := make([]byte, length)
  88. if _, err = io.ReadFull(r, bytes); err != nil {
  89. return
  90. }
  91. return string(bytes), nil
  92. }
  93. func readDecimal(r io.Reader) (v Decimal, err error) {
  94. if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil {
  95. return
  96. }
  97. if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil {
  98. return
  99. }
  100. return
  101. }
  102. func readFloat32(r io.Reader) (v float32, err error) {
  103. if err = binary.Read(r, binary.BigEndian, &v); err != nil {
  104. return
  105. }
  106. return
  107. }
  108. func readFloat64(r io.Reader) (v float64, err error) {
  109. if err = binary.Read(r, binary.BigEndian, &v); err != nil {
  110. return
  111. }
  112. return
  113. }
  114. func readTimestamp(r io.Reader) (v time.Time, err error) {
  115. var sec int64
  116. if err = binary.Read(r, binary.BigEndian, &sec); err != nil {
  117. return
  118. }
  119. return time.Unix(sec, 0), nil
  120. }
  121. /*
  122. 'A': []interface{}
  123. 'D': Decimal
  124. 'F': Table
  125. 'I': int32
  126. 'S': string
  127. 'T': time.Time
  128. 'V': nil
  129. 'b': byte
  130. 'd': float64
  131. 'f': float32
  132. 'l': int64
  133. 's': int16
  134. 't': bool
  135. 'x': []byte
  136. */
  137. func readField(r io.Reader) (v interface{}, err error) {
  138. var typ byte
  139. if err = binary.Read(r, binary.BigEndian, &typ); err != nil {
  140. return
  141. }
  142. switch typ {
  143. case 't':
  144. var value uint8
  145. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  146. return
  147. }
  148. return (value != 0), nil
  149. case 'b':
  150. var value [1]byte
  151. if _, err = io.ReadFull(r, value[0:1]); err != nil {
  152. return
  153. }
  154. return value[0], nil
  155. case 's':
  156. var value int16
  157. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  158. return
  159. }
  160. return value, nil
  161. case 'I':
  162. var value int32
  163. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  164. return
  165. }
  166. return value, nil
  167. case 'l':
  168. var value int64
  169. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  170. return
  171. }
  172. return value, nil
  173. case 'f':
  174. var value float32
  175. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  176. return
  177. }
  178. return value, nil
  179. case 'd':
  180. var value float64
  181. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  182. return
  183. }
  184. return value, nil
  185. case 'D':
  186. return readDecimal(r)
  187. case 'S':
  188. return readLongstr(r)
  189. case 'A':
  190. return readArray(r)
  191. case 'T':
  192. return readTimestamp(r)
  193. case 'F':
  194. return readTable(r)
  195. case 'x':
  196. var len int32
  197. if err = binary.Read(r, binary.BigEndian, &len); err != nil {
  198. return nil, err
  199. }
  200. value := make([]byte, len)
  201. if _, err = io.ReadFull(r, value); err != nil {
  202. return nil, err
  203. }
  204. return value, err
  205. case 'V':
  206. return nil, nil
  207. }
  208. return nil, ErrSyntax
  209. }
  210. /*
  211. Field tables are long strings that contain packed name-value pairs. The
  212. name-value pairs are encoded as short string defining the name, and octet
  213. defining the values type and then the value itself. The valid field types for
  214. tables are an extension of the native integer, bit, string, and timestamp
  215. types, and are shown in the grammar. Multi-octet integer fields are always
  216. held in network byte order.
  217. */
  218. func readTable(r io.Reader) (table Table, err error) {
  219. var nested bytes.Buffer
  220. var str string
  221. if str, err = readLongstr(r); err != nil {
  222. return
  223. }
  224. nested.Write([]byte(str))
  225. table = make(Table)
  226. for nested.Len() > 0 {
  227. var key string
  228. var value interface{}
  229. if key, err = readShortstr(&nested); err != nil {
  230. return
  231. }
  232. if value, err = readField(&nested); err != nil {
  233. return
  234. }
  235. table[key] = value
  236. }
  237. return
  238. }
  239. func readArray(r io.Reader) ([]interface{}, error) {
  240. var size uint32
  241. var err error
  242. if err = binary.Read(r, binary.BigEndian, &size); err != nil {
  243. return nil, err
  244. }
  245. lim := &io.LimitedReader{R: r, N: int64(size)}
  246. arr := make([]interface{}, 0)
  247. var field interface{}
  248. for {
  249. if field, err = readField(lim); err != nil {
  250. if err == io.EOF {
  251. break
  252. }
  253. return nil, err
  254. }
  255. arr = append(arr, field)
  256. }
  257. return arr, nil
  258. }
  259. // Checks if this bit mask matches the flags bitset
  260. func hasProperty(mask uint16, prop int) bool {
  261. return int(mask)&prop > 0
  262. }
  263. func (me *reader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) {
  264. hf := &headerFrame{
  265. ChannelId: channel,
  266. }
  267. if err = binary.Read(me.r, binary.BigEndian, &hf.ClassId); err != nil {
  268. return
  269. }
  270. if err = binary.Read(me.r, binary.BigEndian, &hf.weight); err != nil {
  271. return
  272. }
  273. if err = binary.Read(me.r, binary.BigEndian, &hf.Size); err != nil {
  274. return
  275. }
  276. var flags uint16
  277. if err = binary.Read(me.r, binary.BigEndian, &flags); err != nil {
  278. return
  279. }
  280. if hasProperty(flags, flagContentType) {
  281. if hf.Properties.ContentType, err = readShortstr(me.r); err != nil {
  282. return
  283. }
  284. }
  285. if hasProperty(flags, flagContentEncoding) {
  286. if hf.Properties.ContentEncoding, err = readShortstr(me.r); err != nil {
  287. return
  288. }
  289. }
  290. if hasProperty(flags, flagHeaders) {
  291. if hf.Properties.Headers, err = readTable(me.r); err != nil {
  292. return
  293. }
  294. }
  295. if hasProperty(flags, flagDeliveryMode) {
  296. if err = binary.Read(me.r, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil {
  297. return
  298. }
  299. }
  300. if hasProperty(flags, flagPriority) {
  301. if err = binary.Read(me.r, binary.BigEndian, &hf.Properties.Priority); err != nil {
  302. return
  303. }
  304. }
  305. if hasProperty(flags, flagCorrelationId) {
  306. if hf.Properties.CorrelationId, err = readShortstr(me.r); err != nil {
  307. return
  308. }
  309. }
  310. if hasProperty(flags, flagReplyTo) {
  311. if hf.Properties.ReplyTo, err = readShortstr(me.r); err != nil {
  312. return
  313. }
  314. }
  315. if hasProperty(flags, flagExpiration) {
  316. if hf.Properties.Expiration, err = readShortstr(me.r); err != nil {
  317. return
  318. }
  319. }
  320. if hasProperty(flags, flagMessageId) {
  321. if hf.Properties.MessageId, err = readShortstr(me.r); err != nil {
  322. return
  323. }
  324. }
  325. if hasProperty(flags, flagTimestamp) {
  326. if hf.Properties.Timestamp, err = readTimestamp(me.r); err != nil {
  327. return
  328. }
  329. }
  330. if hasProperty(flags, flagType) {
  331. if hf.Properties.Type, err = readShortstr(me.r); err != nil {
  332. return
  333. }
  334. }
  335. if hasProperty(flags, flagUserId) {
  336. if hf.Properties.UserId, err = readShortstr(me.r); err != nil {
  337. return
  338. }
  339. }
  340. if hasProperty(flags, flagAppId) {
  341. if hf.Properties.AppId, err = readShortstr(me.r); err != nil {
  342. return
  343. }
  344. }
  345. if hasProperty(flags, flagReserved1) {
  346. if hf.Properties.reserved1, err = readShortstr(me.r); err != nil {
  347. return
  348. }
  349. }
  350. return hf, nil
  351. }
  352. func (me *reader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) {
  353. bf := &bodyFrame{
  354. ChannelId: channel,
  355. Body: make([]byte, size),
  356. }
  357. if _, err = io.ReadFull(me.r, bf.Body); err != nil {
  358. return
  359. }
  360. return bf, nil
  361. }
  362. func (me *reader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) {
  363. hf := &heartbeatFrame{
  364. ChannelId: channel,
  365. }
  366. if size > 0 {
  367. panic("Heartbeats should not have a payload")
  368. }
  369. return hf, nil
  370. }