control.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "fmt"
  21. "io"
  22. "math"
  23. "sync"
  24. "time"
  25. "golang.org/x/net/http2"
  26. "golang.org/x/net/http2/hpack"
  27. )
  28. const (
  29. // The default value of flow control window size in HTTP2 spec.
  30. defaultWindowSize = 65535
  31. // The initial window size for flow control.
  32. initialWindowSize = defaultWindowSize // for an RPC
  33. infinity = time.Duration(math.MaxInt64)
  34. defaultClientKeepaliveTime = infinity
  35. defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
  36. defaultMaxStreamsClient = 100
  37. defaultMaxConnectionIdle = infinity
  38. defaultMaxConnectionAge = infinity
  39. defaultMaxConnectionAgeGrace = infinity
  40. defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
  41. defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
  42. defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
  43. // max window limit set by HTTP2 Specs.
  44. maxWindowSize = math.MaxInt32
  45. // defaultLocalSendQuota sets is default value for number of data
  46. // bytes that each stream can schedule before some of it being
  47. // flushed out.
  48. defaultLocalSendQuota = 128 * 1024
  49. )
  50. // The following defines various control items which could flow through
  51. // the control buffer of transport. They represent different aspects of
  52. // control tasks, e.g., flow control, settings, streaming resetting, etc.
  53. type headerFrame struct {
  54. streamID uint32
  55. hf []hpack.HeaderField
  56. endStream bool
  57. }
  58. func (*headerFrame) item() {}
  59. type continuationFrame struct {
  60. streamID uint32
  61. endHeaders bool
  62. headerBlockFragment []byte
  63. }
  64. type dataFrame struct {
  65. streamID uint32
  66. endStream bool
  67. d []byte
  68. f func()
  69. }
  70. func (*dataFrame) item() {}
  71. func (*continuationFrame) item() {}
  72. type windowUpdate struct {
  73. streamID uint32
  74. increment uint32
  75. }
  76. func (*windowUpdate) item() {}
  77. type settings struct {
  78. ss []http2.Setting
  79. }
  80. func (*settings) item() {}
  81. type settingsAck struct {
  82. }
  83. func (*settingsAck) item() {}
  84. type resetStream struct {
  85. streamID uint32
  86. code http2.ErrCode
  87. }
  88. func (*resetStream) item() {}
  89. type goAway struct {
  90. code http2.ErrCode
  91. debugData []byte
  92. headsUp bool
  93. closeConn bool
  94. }
  95. func (*goAway) item() {}
  96. type flushIO struct {
  97. closeTr bool
  98. }
  99. func (*flushIO) item() {}
  100. type ping struct {
  101. ack bool
  102. data [8]byte
  103. }
  104. func (*ping) item() {}
  105. // quotaPool is a pool which accumulates the quota and sends it to acquire()
  106. // when it is available.
  107. type quotaPool struct {
  108. mu sync.Mutex
  109. c chan struct{}
  110. version uint32
  111. quota int
  112. }
  113. // newQuotaPool creates a quotaPool which has quota q available to consume.
  114. func newQuotaPool(q int) *quotaPool {
  115. qb := &quotaPool{
  116. quota: q,
  117. c: make(chan struct{}, 1),
  118. }
  119. return qb
  120. }
  121. // add cancels the pending quota sent on acquired, incremented by v and sends
  122. // it back on acquire.
  123. func (qb *quotaPool) add(v int) {
  124. qb.mu.Lock()
  125. defer qb.mu.Unlock()
  126. qb.lockedAdd(v)
  127. }
  128. func (qb *quotaPool) lockedAdd(v int) {
  129. var wakeUp bool
  130. if qb.quota <= 0 {
  131. wakeUp = true // Wake up potential waiters.
  132. }
  133. qb.quota += v
  134. if wakeUp && qb.quota > 0 {
  135. select {
  136. case qb.c <- struct{}{}:
  137. default:
  138. }
  139. }
  140. }
  141. func (qb *quotaPool) addAndUpdate(v int) {
  142. qb.mu.Lock()
  143. qb.lockedAdd(v)
  144. qb.version++
  145. qb.mu.Unlock()
  146. }
  147. func (qb *quotaPool) get(v int, wc waiters) (int, uint32, error) {
  148. qb.mu.Lock()
  149. if qb.quota > 0 {
  150. if v > qb.quota {
  151. v = qb.quota
  152. }
  153. qb.quota -= v
  154. ver := qb.version
  155. qb.mu.Unlock()
  156. return v, ver, nil
  157. }
  158. qb.mu.Unlock()
  159. for {
  160. select {
  161. case <-wc.ctx.Done():
  162. return 0, 0, ContextErr(wc.ctx.Err())
  163. case <-wc.tctx.Done():
  164. return 0, 0, ErrConnClosing
  165. case <-wc.done:
  166. return 0, 0, io.EOF
  167. case <-wc.goAway:
  168. return 0, 0, errStreamDrain
  169. case <-qb.c:
  170. qb.mu.Lock()
  171. if qb.quota > 0 {
  172. if v > qb.quota {
  173. v = qb.quota
  174. }
  175. qb.quota -= v
  176. ver := qb.version
  177. if qb.quota > 0 {
  178. select {
  179. case qb.c <- struct{}{}:
  180. default:
  181. }
  182. }
  183. qb.mu.Unlock()
  184. return v, ver, nil
  185. }
  186. qb.mu.Unlock()
  187. }
  188. }
  189. }
  190. func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
  191. qb.mu.Lock()
  192. if version == qb.version {
  193. success()
  194. qb.mu.Unlock()
  195. return true
  196. }
  197. failure()
  198. qb.mu.Unlock()
  199. return false
  200. }
  201. // inFlow deals with inbound flow control
  202. type inFlow struct {
  203. mu sync.Mutex
  204. // The inbound flow control limit for pending data.
  205. limit uint32
  206. // pendingData is the overall data which have been received but not been
  207. // consumed by applications.
  208. pendingData uint32
  209. // The amount of data the application has consumed but grpc has not sent
  210. // window update for them. Used to reduce window update frequency.
  211. pendingUpdate uint32
  212. // delta is the extra window update given by receiver when an application
  213. // is reading data bigger in size than the inFlow limit.
  214. delta uint32
  215. }
  216. // newLimit updates the inflow window to a new value n.
  217. // It assumes that n is always greater than the old limit.
  218. func (f *inFlow) newLimit(n uint32) uint32 {
  219. f.mu.Lock()
  220. defer f.mu.Unlock()
  221. d := n - f.limit
  222. f.limit = n
  223. return d
  224. }
  225. func (f *inFlow) maybeAdjust(n uint32) uint32 {
  226. if n > uint32(math.MaxInt32) {
  227. n = uint32(math.MaxInt32)
  228. }
  229. f.mu.Lock()
  230. defer f.mu.Unlock()
  231. // estSenderQuota is the receiver's view of the maximum number of bytes the sender
  232. // can send without a window update.
  233. estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
  234. // estUntransmittedData is the maximum number of bytes the sends might not have put
  235. // on the wire yet. A value of 0 or less means that we have already received all or
  236. // more bytes than the application is requesting to read.
  237. estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
  238. // This implies that unless we send a window update, the sender won't be able to send all the bytes
  239. // for this message. Therefore we must send an update over the limit since there's an active read
  240. // request from the application.
  241. if estUntransmittedData > estSenderQuota {
  242. // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
  243. if f.limit+n > maxWindowSize {
  244. f.delta = maxWindowSize - f.limit
  245. } else {
  246. // Send a window update for the whole message and not just the difference between
  247. // estUntransmittedData and estSenderQuota. This will be helpful in case the message
  248. // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
  249. f.delta = n
  250. }
  251. return f.delta
  252. }
  253. return 0
  254. }
  255. // onData is invoked when some data frame is received. It updates pendingData.
  256. func (f *inFlow) onData(n uint32) error {
  257. f.mu.Lock()
  258. defer f.mu.Unlock()
  259. f.pendingData += n
  260. if f.pendingData+f.pendingUpdate > f.limit+f.delta {
  261. return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
  262. }
  263. return nil
  264. }
  265. // onRead is invoked when the application reads the data. It returns the window size
  266. // to be sent to the peer.
  267. func (f *inFlow) onRead(n uint32) uint32 {
  268. f.mu.Lock()
  269. defer f.mu.Unlock()
  270. if f.pendingData == 0 {
  271. return 0
  272. }
  273. f.pendingData -= n
  274. if n > f.delta {
  275. n -= f.delta
  276. f.delta = 0
  277. } else {
  278. f.delta -= n
  279. n = 0
  280. }
  281. f.pendingUpdate += n
  282. if f.pendingUpdate >= f.limit/4 {
  283. wu := f.pendingUpdate
  284. f.pendingUpdate = 0
  285. return wu
  286. }
  287. return 0
  288. }
  289. func (f *inFlow) resetPendingUpdate() uint32 {
  290. f.mu.Lock()
  291. defer f.mu.Unlock()
  292. n := f.pendingUpdate
  293. f.pendingUpdate = 0
  294. return n
  295. }