http2_server.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "math/rand"
  26. "net"
  27. "strconv"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/golang/protobuf/proto"
  32. "golang.org/x/net/context"
  33. "golang.org/x/net/http2"
  34. "golang.org/x/net/http2/hpack"
  35. "google.golang.org/grpc/codes"
  36. "google.golang.org/grpc/credentials"
  37. "google.golang.org/grpc/keepalive"
  38. "google.golang.org/grpc/metadata"
  39. "google.golang.org/grpc/peer"
  40. "google.golang.org/grpc/stats"
  41. "google.golang.org/grpc/status"
  42. "google.golang.org/grpc/tap"
  43. )
  44. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  45. // the stream's state.
  46. var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  47. // http2Server implements the ServerTransport interface with HTTP2.
  48. type http2Server struct {
  49. ctx context.Context
  50. cancel context.CancelFunc
  51. conn net.Conn
  52. remoteAddr net.Addr
  53. localAddr net.Addr
  54. maxStreamID uint32 // max stream ID ever seen
  55. authInfo credentials.AuthInfo // auth info about the connection
  56. inTapHandle tap.ServerInHandle
  57. framer *framer
  58. hBuf *bytes.Buffer // the buffer for HPACK encoding
  59. hEnc *hpack.Encoder // HPACK encoder
  60. // The max number of concurrent streams.
  61. maxStreams uint32
  62. // controlBuf delivers all the control related tasks (e.g., window
  63. // updates, reset streams, and various settings) to the controller.
  64. controlBuf *controlBuffer
  65. fc *inFlow
  66. // sendQuotaPool provides flow control to outbound message.
  67. sendQuotaPool *quotaPool
  68. // localSendQuota limits the amount of data that can be scheduled
  69. // for writing before it is actually written out.
  70. localSendQuota *quotaPool
  71. stats stats.Handler
  72. // Flag to keep track of reading activity on transport.
  73. // 1 is true and 0 is false.
  74. activity uint32 // Accessed atomically.
  75. // Keepalive and max-age parameters for the server.
  76. kp keepalive.ServerParameters
  77. // Keepalive enforcement policy.
  78. kep keepalive.EnforcementPolicy
  79. // The time instance last ping was received.
  80. lastPingAt time.Time
  81. // Number of times the client has violated keepalive ping policy so far.
  82. pingStrikes uint8
  83. // Flag to signify that number of ping strikes should be reset to 0.
  84. // This is set whenever data or header frames are sent.
  85. // 1 means yes.
  86. resetPingStrikes uint32 // Accessed atomically.
  87. initialWindowSize int32
  88. bdpEst *bdpEstimator
  89. mu sync.Mutex // guard the following
  90. // drainChan is initialized when drain(...) is called the first time.
  91. // After which the server writes out the first GoAway(with ID 2^31-1) frame.
  92. // Then an independent goroutine will be launched to later send the second GoAway.
  93. // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
  94. // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
  95. // already underway.
  96. drainChan chan struct{}
  97. state transportState
  98. activeStreams map[uint32]*Stream
  99. // the per-stream outbound flow control window size set by the peer.
  100. streamSendQuota uint32
  101. // idle is the time instant when the connection went idle.
  102. // This is either the beginning of the connection or when the number of
  103. // RPCs go down to 0.
  104. // When the connection is busy, this value is set to 0.
  105. idle time.Time
  106. }
  107. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  108. // returned if something goes wrong.
  109. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  110. writeBufSize := defaultWriteBufSize
  111. if config.WriteBufferSize > 0 {
  112. writeBufSize = config.WriteBufferSize
  113. }
  114. readBufSize := defaultReadBufSize
  115. if config.ReadBufferSize > 0 {
  116. readBufSize = config.ReadBufferSize
  117. }
  118. framer := newFramer(conn, writeBufSize, readBufSize)
  119. // Send initial settings as connection preface to client.
  120. var isettings []http2.Setting
  121. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  122. // permitted in the HTTP2 spec.
  123. maxStreams := config.MaxStreams
  124. if maxStreams == 0 {
  125. maxStreams = math.MaxUint32
  126. } else {
  127. isettings = append(isettings, http2.Setting{
  128. ID: http2.SettingMaxConcurrentStreams,
  129. Val: maxStreams,
  130. })
  131. }
  132. dynamicWindow := true
  133. iwz := int32(initialWindowSize)
  134. if config.InitialWindowSize >= defaultWindowSize {
  135. iwz = config.InitialWindowSize
  136. dynamicWindow = false
  137. }
  138. icwz := int32(initialWindowSize)
  139. if config.InitialConnWindowSize >= defaultWindowSize {
  140. icwz = config.InitialConnWindowSize
  141. dynamicWindow = false
  142. }
  143. if iwz != defaultWindowSize {
  144. isettings = append(isettings, http2.Setting{
  145. ID: http2.SettingInitialWindowSize,
  146. Val: uint32(iwz)})
  147. }
  148. if err := framer.fr.WriteSettings(isettings...); err != nil {
  149. return nil, connectionErrorf(false, err, "transport: %v", err)
  150. }
  151. // Adjust the connection flow control window if needed.
  152. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  153. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  154. return nil, connectionErrorf(false, err, "transport: %v", err)
  155. }
  156. }
  157. kp := config.KeepaliveParams
  158. if kp.MaxConnectionIdle == 0 {
  159. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  160. }
  161. if kp.MaxConnectionAge == 0 {
  162. kp.MaxConnectionAge = defaultMaxConnectionAge
  163. }
  164. // Add a jitter to MaxConnectionAge.
  165. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  166. if kp.MaxConnectionAgeGrace == 0 {
  167. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  168. }
  169. if kp.Time == 0 {
  170. kp.Time = defaultServerKeepaliveTime
  171. }
  172. if kp.Timeout == 0 {
  173. kp.Timeout = defaultServerKeepaliveTimeout
  174. }
  175. kep := config.KeepalivePolicy
  176. if kep.MinTime == 0 {
  177. kep.MinTime = defaultKeepalivePolicyMinTime
  178. }
  179. var buf bytes.Buffer
  180. ctx, cancel := context.WithCancel(context.Background())
  181. t := &http2Server{
  182. ctx: ctx,
  183. cancel: cancel,
  184. conn: conn,
  185. remoteAddr: conn.RemoteAddr(),
  186. localAddr: conn.LocalAddr(),
  187. authInfo: config.AuthInfo,
  188. framer: framer,
  189. hBuf: &buf,
  190. hEnc: hpack.NewEncoder(&buf),
  191. maxStreams: maxStreams,
  192. inTapHandle: config.InTapHandle,
  193. controlBuf: newControlBuffer(),
  194. fc: &inFlow{limit: uint32(icwz)},
  195. sendQuotaPool: newQuotaPool(defaultWindowSize),
  196. localSendQuota: newQuotaPool(defaultLocalSendQuota),
  197. state: reachable,
  198. activeStreams: make(map[uint32]*Stream),
  199. streamSendQuota: defaultWindowSize,
  200. stats: config.StatsHandler,
  201. kp: kp,
  202. idle: time.Now(),
  203. kep: kep,
  204. initialWindowSize: iwz,
  205. }
  206. if dynamicWindow {
  207. t.bdpEst = &bdpEstimator{
  208. bdp: initialWindowSize,
  209. updateFlowControl: t.updateFlowControl,
  210. }
  211. }
  212. if t.stats != nil {
  213. t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
  214. RemoteAddr: t.remoteAddr,
  215. LocalAddr: t.localAddr,
  216. })
  217. connBegin := &stats.ConnBegin{}
  218. t.stats.HandleConn(t.ctx, connBegin)
  219. }
  220. t.framer.writer.Flush()
  221. defer func() {
  222. if err != nil {
  223. t.Close()
  224. }
  225. }()
  226. // Check the validity of client preface.
  227. preface := make([]byte, len(clientPreface))
  228. if _, err := io.ReadFull(t.conn, preface); err != nil {
  229. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  230. }
  231. if !bytes.Equal(preface, clientPreface) {
  232. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  233. }
  234. frame, err := t.framer.fr.ReadFrame()
  235. if err == io.EOF || err == io.ErrUnexpectedEOF {
  236. return nil, err
  237. }
  238. if err != nil {
  239. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  240. }
  241. atomic.StoreUint32(&t.activity, 1)
  242. sf, ok := frame.(*http2.SettingsFrame)
  243. if !ok {
  244. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  245. }
  246. t.handleSettings(sf)
  247. go func() {
  248. loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
  249. t.conn.Close()
  250. }()
  251. go t.keepalive()
  252. return t, nil
  253. }
  254. // operateHeader takes action on the decoded headers.
  255. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
  256. streamID := frame.Header().StreamID
  257. var state decodeState
  258. for _, hf := range frame.Fields {
  259. if err := state.processHeaderField(hf); err != nil {
  260. if se, ok := err.(StreamError); ok {
  261. t.controlBuf.put(&resetStream{streamID, statusCodeConvTab[se.Code]})
  262. }
  263. return
  264. }
  265. }
  266. buf := newRecvBuffer()
  267. s := &Stream{
  268. id: streamID,
  269. st: t,
  270. buf: buf,
  271. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  272. recvCompress: state.encoding,
  273. method: state.method,
  274. }
  275. if frame.StreamEnded() {
  276. // s is just created by the caller. No lock needed.
  277. s.state = streamReadDone
  278. }
  279. if state.timeoutSet {
  280. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
  281. } else {
  282. s.ctx, s.cancel = context.WithCancel(t.ctx)
  283. }
  284. pr := &peer.Peer{
  285. Addr: t.remoteAddr,
  286. }
  287. // Attach Auth info if there is any.
  288. if t.authInfo != nil {
  289. pr.AuthInfo = t.authInfo
  290. }
  291. s.ctx = peer.NewContext(s.ctx, pr)
  292. // Cache the current stream to the context so that the server application
  293. // can find out. Required when the server wants to send some metadata
  294. // back to the client (unary call only).
  295. s.ctx = newContextWithStream(s.ctx, s)
  296. // Attach the received metadata to the context.
  297. if len(state.mdata) > 0 {
  298. s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
  299. }
  300. if state.statsTags != nil {
  301. s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
  302. }
  303. if state.statsTrace != nil {
  304. s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
  305. }
  306. if t.inTapHandle != nil {
  307. var err error
  308. info := &tap.Info{
  309. FullMethodName: state.method,
  310. }
  311. s.ctx, err = t.inTapHandle(s.ctx, info)
  312. if err != nil {
  313. warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
  314. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
  315. return
  316. }
  317. }
  318. t.mu.Lock()
  319. if t.state != reachable {
  320. t.mu.Unlock()
  321. return
  322. }
  323. if uint32(len(t.activeStreams)) >= t.maxStreams {
  324. t.mu.Unlock()
  325. t.controlBuf.put(&resetStream{streamID, http2.ErrCodeRefusedStream})
  326. return
  327. }
  328. if streamID%2 != 1 || streamID <= t.maxStreamID {
  329. t.mu.Unlock()
  330. // illegal gRPC stream id.
  331. errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
  332. return true
  333. }
  334. t.maxStreamID = streamID
  335. s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
  336. t.activeStreams[streamID] = s
  337. if len(t.activeStreams) == 1 {
  338. t.idle = time.Time{}
  339. }
  340. t.mu.Unlock()
  341. s.requestRead = func(n int) {
  342. t.adjustWindow(s, uint32(n))
  343. }
  344. s.ctx = traceCtx(s.ctx, s.method)
  345. if t.stats != nil {
  346. s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  347. inHeader := &stats.InHeader{
  348. FullMethod: s.method,
  349. RemoteAddr: t.remoteAddr,
  350. LocalAddr: t.localAddr,
  351. Compression: s.recvCompress,
  352. WireLength: int(frame.Header().Length),
  353. }
  354. t.stats.HandleRPC(s.ctx, inHeader)
  355. }
  356. s.trReader = &transportReader{
  357. reader: &recvBufferReader{
  358. ctx: s.ctx,
  359. recv: s.buf,
  360. },
  361. windowHandler: func(n int) {
  362. t.updateWindow(s, uint32(n))
  363. },
  364. }
  365. s.waiters = waiters{
  366. ctx: s.ctx,
  367. tctx: t.ctx,
  368. }
  369. handle(s)
  370. return
  371. }
  372. // HandleStreams receives incoming streams using the given handler. This is
  373. // typically run in a separate goroutine.
  374. // traceCtx attaches trace to ctx and returns the new context.
  375. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  376. for {
  377. frame, err := t.framer.fr.ReadFrame()
  378. atomic.StoreUint32(&t.activity, 1)
  379. if err != nil {
  380. if se, ok := err.(http2.StreamError); ok {
  381. t.mu.Lock()
  382. s := t.activeStreams[se.StreamID]
  383. t.mu.Unlock()
  384. if s != nil {
  385. t.closeStream(s)
  386. }
  387. t.controlBuf.put(&resetStream{se.StreamID, se.Code})
  388. continue
  389. }
  390. if err == io.EOF || err == io.ErrUnexpectedEOF {
  391. t.Close()
  392. return
  393. }
  394. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  395. t.Close()
  396. return
  397. }
  398. switch frame := frame.(type) {
  399. case *http2.MetaHeadersFrame:
  400. if t.operateHeaders(frame, handle, traceCtx) {
  401. t.Close()
  402. break
  403. }
  404. case *http2.DataFrame:
  405. t.handleData(frame)
  406. case *http2.RSTStreamFrame:
  407. t.handleRSTStream(frame)
  408. case *http2.SettingsFrame:
  409. t.handleSettings(frame)
  410. case *http2.PingFrame:
  411. t.handlePing(frame)
  412. case *http2.WindowUpdateFrame:
  413. t.handleWindowUpdate(frame)
  414. case *http2.GoAwayFrame:
  415. // TODO: Handle GoAway from the client appropriately.
  416. default:
  417. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  418. }
  419. }
  420. }
  421. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  422. t.mu.Lock()
  423. defer t.mu.Unlock()
  424. if t.activeStreams == nil {
  425. // The transport is closing.
  426. return nil, false
  427. }
  428. s, ok := t.activeStreams[f.Header().StreamID]
  429. if !ok {
  430. // The stream is already done.
  431. return nil, false
  432. }
  433. return s, true
  434. }
  435. // adjustWindow sends out extra window update over the initial window size
  436. // of stream if the application is requesting data larger in size than
  437. // the window.
  438. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  439. s.mu.Lock()
  440. defer s.mu.Unlock()
  441. if s.state == streamDone {
  442. return
  443. }
  444. if w := s.fc.maybeAdjust(n); w > 0 {
  445. if cw := t.fc.resetPendingUpdate(); cw > 0 {
  446. t.controlBuf.put(&windowUpdate{0, cw})
  447. }
  448. t.controlBuf.put(&windowUpdate{s.id, w})
  449. }
  450. }
  451. // updateWindow adjusts the inbound quota for the stream and the transport.
  452. // Window updates will deliver to the controller for sending when
  453. // the cumulative quota exceeds the corresponding threshold.
  454. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  455. s.mu.Lock()
  456. defer s.mu.Unlock()
  457. if s.state == streamDone {
  458. return
  459. }
  460. if w := s.fc.onRead(n); w > 0 {
  461. if cw := t.fc.resetPendingUpdate(); cw > 0 {
  462. t.controlBuf.put(&windowUpdate{0, cw})
  463. }
  464. t.controlBuf.put(&windowUpdate{s.id, w})
  465. }
  466. }
  467. // updateFlowControl updates the incoming flow control windows
  468. // for the transport and the stream based on the current bdp
  469. // estimation.
  470. func (t *http2Server) updateFlowControl(n uint32) {
  471. t.mu.Lock()
  472. for _, s := range t.activeStreams {
  473. s.fc.newLimit(n)
  474. }
  475. t.initialWindowSize = int32(n)
  476. t.mu.Unlock()
  477. t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
  478. t.controlBuf.put(&settings{
  479. ss: []http2.Setting{
  480. {
  481. ID: http2.SettingInitialWindowSize,
  482. Val: uint32(n),
  483. },
  484. },
  485. })
  486. }
  487. func (t *http2Server) handleData(f *http2.DataFrame) {
  488. size := f.Header().Length
  489. var sendBDPPing bool
  490. if t.bdpEst != nil {
  491. sendBDPPing = t.bdpEst.add(uint32(size))
  492. }
  493. // Decouple connection's flow control from application's read.
  494. // An update on connection's flow control should not depend on
  495. // whether user application has read the data or not. Such a
  496. // restriction is already imposed on the stream's flow control,
  497. // and therefore the sender will be blocked anyways.
  498. // Decoupling the connection flow control will prevent other
  499. // active(fast) streams from starving in presence of slow or
  500. // inactive streams.
  501. //
  502. // Furthermore, if a bdpPing is being sent out we can piggyback
  503. // connection's window update for the bytes we just received.
  504. if sendBDPPing {
  505. if size != 0 { // Could be an empty frame.
  506. t.controlBuf.put(&windowUpdate{0, uint32(size)})
  507. }
  508. t.controlBuf.put(bdpPing)
  509. } else {
  510. if err := t.fc.onData(uint32(size)); err != nil {
  511. errorf("transport: http2Server %v", err)
  512. t.Close()
  513. return
  514. }
  515. if w := t.fc.onRead(uint32(size)); w > 0 {
  516. t.controlBuf.put(&windowUpdate{0, w})
  517. }
  518. }
  519. // Select the right stream to dispatch.
  520. s, ok := t.getStream(f)
  521. if !ok {
  522. return
  523. }
  524. if size > 0 {
  525. s.mu.Lock()
  526. if s.state == streamDone {
  527. s.mu.Unlock()
  528. return
  529. }
  530. if err := s.fc.onData(uint32(size)); err != nil {
  531. s.mu.Unlock()
  532. t.closeStream(s)
  533. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
  534. return
  535. }
  536. if f.Header().Flags.Has(http2.FlagDataPadded) {
  537. if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
  538. t.controlBuf.put(&windowUpdate{s.id, w})
  539. }
  540. }
  541. s.mu.Unlock()
  542. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  543. // guarantee f.Data() is consumed before the arrival of next frame.
  544. // Can this copy be eliminated?
  545. if len(f.Data()) > 0 {
  546. data := make([]byte, len(f.Data()))
  547. copy(data, f.Data())
  548. s.write(recvMsg{data: data})
  549. }
  550. }
  551. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  552. // Received the end of stream from the client.
  553. s.mu.Lock()
  554. if s.state != streamDone {
  555. s.state = streamReadDone
  556. }
  557. s.mu.Unlock()
  558. s.write(recvMsg{err: io.EOF})
  559. }
  560. }
  561. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  562. s, ok := t.getStream(f)
  563. if !ok {
  564. return
  565. }
  566. t.closeStream(s)
  567. }
  568. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  569. if f.IsAck() {
  570. return
  571. }
  572. var rs []http2.Setting
  573. var ps []http2.Setting
  574. f.ForeachSetting(func(s http2.Setting) error {
  575. if t.isRestrictive(s) {
  576. rs = append(rs, s)
  577. } else {
  578. ps = append(ps, s)
  579. }
  580. return nil
  581. })
  582. t.applySettings(rs)
  583. t.controlBuf.put(&settingsAck{})
  584. t.applySettings(ps)
  585. }
  586. func (t *http2Server) isRestrictive(s http2.Setting) bool {
  587. switch s.ID {
  588. case http2.SettingInitialWindowSize:
  589. // Note: we don't acquire a lock here to read streamSendQuota
  590. // because the same goroutine updates it later.
  591. return s.Val < t.streamSendQuota
  592. }
  593. return false
  594. }
  595. func (t *http2Server) applySettings(ss []http2.Setting) {
  596. for _, s := range ss {
  597. if s.ID == http2.SettingInitialWindowSize {
  598. t.mu.Lock()
  599. for _, stream := range t.activeStreams {
  600. stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
  601. }
  602. t.streamSendQuota = s.Val
  603. t.mu.Unlock()
  604. }
  605. }
  606. }
  607. const (
  608. maxPingStrikes = 2
  609. defaultPingTimeout = 2 * time.Hour
  610. )
  611. func (t *http2Server) handlePing(f *http2.PingFrame) {
  612. if f.IsAck() {
  613. if f.Data == goAwayPing.data && t.drainChan != nil {
  614. close(t.drainChan)
  615. return
  616. }
  617. // Maybe it's a BDP ping.
  618. if t.bdpEst != nil {
  619. t.bdpEst.calculate(f.Data)
  620. }
  621. return
  622. }
  623. pingAck := &ping{ack: true}
  624. copy(pingAck.data[:], f.Data[:])
  625. t.controlBuf.put(pingAck)
  626. now := time.Now()
  627. defer func() {
  628. t.lastPingAt = now
  629. }()
  630. // A reset ping strikes means that we don't need to check for policy
  631. // violation for this ping and the pingStrikes counter should be set
  632. // to 0.
  633. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  634. t.pingStrikes = 0
  635. return
  636. }
  637. t.mu.Lock()
  638. ns := len(t.activeStreams)
  639. t.mu.Unlock()
  640. if ns < 1 && !t.kep.PermitWithoutStream {
  641. // Keepalive shouldn't be active thus, this new ping should
  642. // have come after at least defaultPingTimeout.
  643. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  644. t.pingStrikes++
  645. }
  646. } else {
  647. // Check if keepalive policy is respected.
  648. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  649. t.pingStrikes++
  650. }
  651. }
  652. if t.pingStrikes > maxPingStrikes {
  653. // Send goaway and close the connection.
  654. errorf("transport: Got too many pings from the client, closing the connection.")
  655. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
  656. }
  657. }
  658. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  659. id := f.Header().StreamID
  660. incr := f.Increment
  661. if id == 0 {
  662. t.sendQuotaPool.add(int(incr))
  663. return
  664. }
  665. if s, ok := t.getStream(f); ok {
  666. s.sendQuotaPool.add(int(incr))
  667. }
  668. }
  669. // WriteHeader sends the header metedata md back to the client.
  670. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  671. select {
  672. case <-s.ctx.Done():
  673. return ContextErr(s.ctx.Err())
  674. case <-t.ctx.Done():
  675. return ErrConnClosing
  676. default:
  677. }
  678. s.mu.Lock()
  679. if s.headerOk || s.state == streamDone {
  680. s.mu.Unlock()
  681. return ErrIllegalHeaderWrite
  682. }
  683. s.headerOk = true
  684. if md.Len() > 0 {
  685. if s.header.Len() > 0 {
  686. s.header = metadata.Join(s.header, md)
  687. } else {
  688. s.header = md
  689. }
  690. }
  691. md = s.header
  692. s.mu.Unlock()
  693. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  694. // first and create a slice of that exact size.
  695. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  696. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  697. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  698. if s.sendCompress != "" {
  699. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  700. }
  701. for k, vv := range md {
  702. if isReservedHeader(k) {
  703. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  704. continue
  705. }
  706. for _, v := range vv {
  707. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  708. }
  709. }
  710. t.controlBuf.put(&headerFrame{
  711. streamID: s.id,
  712. hf: headerFields,
  713. endStream: false,
  714. })
  715. if t.stats != nil {
  716. outHeader := &stats.OutHeader{
  717. //WireLength: // TODO(mmukhi): Revisit this later, if needed.
  718. }
  719. t.stats.HandleRPC(s.Context(), outHeader)
  720. }
  721. return nil
  722. }
  723. // WriteStatus sends stream status to the client and terminates the stream.
  724. // There is no further I/O operations being able to perform on this stream.
  725. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  726. // OK is adopted.
  727. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  728. select {
  729. case <-t.ctx.Done():
  730. return ErrConnClosing
  731. default:
  732. }
  733. var headersSent, hasHeader bool
  734. s.mu.Lock()
  735. if s.state == streamDone {
  736. s.mu.Unlock()
  737. return nil
  738. }
  739. if s.headerOk {
  740. headersSent = true
  741. }
  742. if s.header.Len() > 0 {
  743. hasHeader = true
  744. }
  745. s.mu.Unlock()
  746. if !headersSent && hasHeader {
  747. t.WriteHeader(s, nil)
  748. headersSent = true
  749. }
  750. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  751. // first and create a slice of that exact size.
  752. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  753. if !headersSent {
  754. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  755. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  756. }
  757. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  758. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  759. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  760. stBytes, err := proto.Marshal(p)
  761. if err != nil {
  762. // TODO: return error instead, when callers are able to handle it.
  763. panic(err)
  764. }
  765. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  766. }
  767. // Attach the trailer metadata.
  768. for k, vv := range s.trailer {
  769. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  770. if isReservedHeader(k) {
  771. continue
  772. }
  773. for _, v := range vv {
  774. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  775. }
  776. }
  777. t.controlBuf.put(&headerFrame{
  778. streamID: s.id,
  779. hf: headerFields,
  780. endStream: true,
  781. })
  782. if t.stats != nil {
  783. t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
  784. }
  785. t.closeStream(s)
  786. return nil
  787. }
  788. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  789. // is returns if it fails (e.g., framing error, transport error).
  790. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  791. select {
  792. case <-s.ctx.Done():
  793. return ContextErr(s.ctx.Err())
  794. case <-t.ctx.Done():
  795. return ErrConnClosing
  796. default:
  797. }
  798. var writeHeaderFrame bool
  799. s.mu.Lock()
  800. if s.state == streamDone {
  801. s.mu.Unlock()
  802. return streamErrorf(codes.Unknown, "the stream has been done")
  803. }
  804. if !s.headerOk {
  805. writeHeaderFrame = true
  806. }
  807. s.mu.Unlock()
  808. if writeHeaderFrame {
  809. t.WriteHeader(s, nil)
  810. }
  811. // Add data to header frame so that we can equally distribute data across frames.
  812. emptyLen := http2MaxFrameLen - len(hdr)
  813. if emptyLen > len(data) {
  814. emptyLen = len(data)
  815. }
  816. hdr = append(hdr, data[:emptyLen]...)
  817. data = data[emptyLen:]
  818. var (
  819. streamQuota int
  820. streamQuotaVer uint32
  821. err error
  822. )
  823. for _, r := range [][]byte{hdr, data} {
  824. for len(r) > 0 {
  825. size := http2MaxFrameLen
  826. if size > len(r) {
  827. size = len(r)
  828. }
  829. if streamQuota == 0 { // Used up all the locally cached stream quota.
  830. // Get all the stream quota there is.
  831. streamQuota, streamQuotaVer, err = s.sendQuotaPool.get(math.MaxInt32, s.waiters)
  832. if err != nil {
  833. return err
  834. }
  835. }
  836. if size > streamQuota {
  837. size = streamQuota
  838. }
  839. // Get size worth quota from transport.
  840. tq, _, err := t.sendQuotaPool.get(size, s.waiters)
  841. if err != nil {
  842. return err
  843. }
  844. if tq < size {
  845. size = tq
  846. }
  847. ltq, _, err := t.localSendQuota.get(size, s.waiters)
  848. if err != nil {
  849. return err
  850. }
  851. // even if ltq is smaller than size we don't adjust size since,
  852. // ltq is only a soft limit.
  853. streamQuota -= size
  854. p := r[:size]
  855. // Reset ping strikes when sending data since this might cause
  856. // the peer to send ping.
  857. atomic.StoreUint32(&t.resetPingStrikes, 1)
  858. success := func() {
  859. ltq := ltq
  860. t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
  861. t.localSendQuota.add(ltq)
  862. }})
  863. r = r[size:]
  864. }
  865. failure := func() { // The stream quota version must have changed.
  866. // Our streamQuota cache is invalidated now, so give it back.
  867. s.sendQuotaPool.lockedAdd(streamQuota + size)
  868. }
  869. if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
  870. // Couldn't send this chunk out.
  871. t.sendQuotaPool.add(size)
  872. t.localSendQuota.add(ltq)
  873. streamQuota = 0
  874. }
  875. }
  876. }
  877. if streamQuota > 0 {
  878. // ADd the left over quota back to stream.
  879. s.sendQuotaPool.add(streamQuota)
  880. }
  881. return nil
  882. }
  883. // keepalive running in a separate goroutine does the following:
  884. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  885. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  886. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  887. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  888. // after an additional duration of keepalive.Timeout.
  889. func (t *http2Server) keepalive() {
  890. p := &ping{}
  891. var pingSent bool
  892. maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
  893. maxAge := time.NewTimer(t.kp.MaxConnectionAge)
  894. keepalive := time.NewTimer(t.kp.Time)
  895. // NOTE: All exit paths of this function should reset their
  896. // respective timers. A failure to do so will cause the
  897. // following clean-up to deadlock and eventually leak.
  898. defer func() {
  899. if !maxIdle.Stop() {
  900. <-maxIdle.C
  901. }
  902. if !maxAge.Stop() {
  903. <-maxAge.C
  904. }
  905. if !keepalive.Stop() {
  906. <-keepalive.C
  907. }
  908. }()
  909. for {
  910. select {
  911. case <-maxIdle.C:
  912. t.mu.Lock()
  913. idle := t.idle
  914. if idle.IsZero() { // The connection is non-idle.
  915. t.mu.Unlock()
  916. maxIdle.Reset(t.kp.MaxConnectionIdle)
  917. continue
  918. }
  919. val := t.kp.MaxConnectionIdle - time.Since(idle)
  920. t.mu.Unlock()
  921. if val <= 0 {
  922. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  923. // Gracefully close the connection.
  924. t.drain(http2.ErrCodeNo, []byte{})
  925. // Reseting the timer so that the clean-up doesn't deadlock.
  926. maxIdle.Reset(infinity)
  927. return
  928. }
  929. maxIdle.Reset(val)
  930. case <-maxAge.C:
  931. t.drain(http2.ErrCodeNo, []byte{})
  932. maxAge.Reset(t.kp.MaxConnectionAgeGrace)
  933. select {
  934. case <-maxAge.C:
  935. // Close the connection after grace period.
  936. t.Close()
  937. // Reseting the timer so that the clean-up doesn't deadlock.
  938. maxAge.Reset(infinity)
  939. case <-t.ctx.Done():
  940. }
  941. return
  942. case <-keepalive.C:
  943. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  944. pingSent = false
  945. keepalive.Reset(t.kp.Time)
  946. continue
  947. }
  948. if pingSent {
  949. t.Close()
  950. // Reseting the timer so that the clean-up doesn't deadlock.
  951. keepalive.Reset(infinity)
  952. return
  953. }
  954. pingSent = true
  955. t.controlBuf.put(p)
  956. keepalive.Reset(t.kp.Timeout)
  957. case <-t.ctx.Done():
  958. return
  959. }
  960. }
  961. }
  962. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  963. // TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
  964. // is duplicated between the client and the server.
  965. // The transport layer needs to be refactored to take care of this.
  966. func (t *http2Server) itemHandler(i item) error {
  967. switch i := i.(type) {
  968. case *dataFrame:
  969. if err := t.framer.fr.WriteData(i.streamID, i.endStream, i.d); err != nil {
  970. return err
  971. }
  972. i.f()
  973. return nil
  974. case *headerFrame:
  975. t.hBuf.Reset()
  976. for _, f := range i.hf {
  977. t.hEnc.WriteField(f)
  978. }
  979. first := true
  980. endHeaders := false
  981. for !endHeaders {
  982. size := t.hBuf.Len()
  983. if size > http2MaxFrameLen {
  984. size = http2MaxFrameLen
  985. } else {
  986. endHeaders = true
  987. }
  988. var err error
  989. if first {
  990. first = false
  991. err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  992. StreamID: i.streamID,
  993. BlockFragment: t.hBuf.Next(size),
  994. EndStream: i.endStream,
  995. EndHeaders: endHeaders,
  996. })
  997. } else {
  998. err = t.framer.fr.WriteContinuation(
  999. i.streamID,
  1000. endHeaders,
  1001. t.hBuf.Next(size),
  1002. )
  1003. }
  1004. if err != nil {
  1005. return err
  1006. }
  1007. }
  1008. atomic.StoreUint32(&t.resetPingStrikes, 1)
  1009. return nil
  1010. case *windowUpdate:
  1011. return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
  1012. case *settings:
  1013. return t.framer.fr.WriteSettings(i.ss...)
  1014. case *settingsAck:
  1015. return t.framer.fr.WriteSettingsAck()
  1016. case *resetStream:
  1017. return t.framer.fr.WriteRSTStream(i.streamID, i.code)
  1018. case *goAway:
  1019. t.mu.Lock()
  1020. if t.state == closing {
  1021. t.mu.Unlock()
  1022. // The transport is closing.
  1023. return fmt.Errorf("transport: Connection closing")
  1024. }
  1025. sid := t.maxStreamID
  1026. if !i.headsUp {
  1027. // Stop accepting more streams now.
  1028. t.state = draining
  1029. if len(t.activeStreams) == 0 {
  1030. i.closeConn = true
  1031. }
  1032. t.mu.Unlock()
  1033. if err := t.framer.fr.WriteGoAway(sid, i.code, i.debugData); err != nil {
  1034. return err
  1035. }
  1036. if i.closeConn {
  1037. // Abruptly close the connection following the GoAway (via
  1038. // loopywriter). But flush out what's inside the buffer first.
  1039. t.controlBuf.put(&flushIO{closeTr: true})
  1040. }
  1041. return nil
  1042. }
  1043. t.mu.Unlock()
  1044. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1045. // Follow that with a ping and wait for the ack to come back or a timer
  1046. // to expire. During this time accept new streams since they might have
  1047. // originated before the GoAway reaches the client.
  1048. // After getting the ack or timer expiration send out another GoAway this
  1049. // time with an ID of the max stream server intends to process.
  1050. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
  1051. return err
  1052. }
  1053. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1054. return err
  1055. }
  1056. go func() {
  1057. timer := time.NewTimer(time.Minute)
  1058. defer timer.Stop()
  1059. select {
  1060. case <-t.drainChan:
  1061. case <-timer.C:
  1062. case <-t.ctx.Done():
  1063. return
  1064. }
  1065. t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
  1066. }()
  1067. return nil
  1068. case *flushIO:
  1069. if err := t.framer.writer.Flush(); err != nil {
  1070. return err
  1071. }
  1072. if i.closeTr {
  1073. return ErrConnClosing
  1074. }
  1075. return nil
  1076. case *ping:
  1077. if !i.ack {
  1078. t.bdpEst.timesnap(i.data)
  1079. }
  1080. return t.framer.fr.WritePing(i.ack, i.data)
  1081. default:
  1082. err := status.Errorf(codes.Internal, "transport: http2Server.controller got unexpected item type %t", i)
  1083. errorf("%v", err)
  1084. return err
  1085. }
  1086. }
  1087. // Close starts shutting down the http2Server transport.
  1088. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  1089. // could cause some resource issue. Revisit this later.
  1090. func (t *http2Server) Close() error {
  1091. t.mu.Lock()
  1092. if t.state == closing {
  1093. t.mu.Unlock()
  1094. return errors.New("transport: Close() was already called")
  1095. }
  1096. t.state = closing
  1097. streams := t.activeStreams
  1098. t.activeStreams = nil
  1099. t.mu.Unlock()
  1100. t.cancel()
  1101. err := t.conn.Close()
  1102. // Cancel all active streams.
  1103. for _, s := range streams {
  1104. s.cancel()
  1105. }
  1106. if t.stats != nil {
  1107. connEnd := &stats.ConnEnd{}
  1108. t.stats.HandleConn(t.ctx, connEnd)
  1109. }
  1110. return err
  1111. }
  1112. // closeStream clears the footprint of a stream when the stream is not needed
  1113. // any more.
  1114. func (t *http2Server) closeStream(s *Stream) {
  1115. t.mu.Lock()
  1116. delete(t.activeStreams, s.id)
  1117. if len(t.activeStreams) == 0 {
  1118. t.idle = time.Now()
  1119. }
  1120. if t.state == draining && len(t.activeStreams) == 0 {
  1121. defer t.controlBuf.put(&flushIO{closeTr: true})
  1122. }
  1123. t.mu.Unlock()
  1124. // In case stream sending and receiving are invoked in separate
  1125. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1126. // called to interrupt the potential blocking on other goroutines.
  1127. s.cancel()
  1128. s.mu.Lock()
  1129. if s.state == streamDone {
  1130. s.mu.Unlock()
  1131. return
  1132. }
  1133. s.state = streamDone
  1134. s.mu.Unlock()
  1135. }
  1136. func (t *http2Server) RemoteAddr() net.Addr {
  1137. return t.remoteAddr
  1138. }
  1139. func (t *http2Server) Drain() {
  1140. t.drain(http2.ErrCodeNo, []byte{})
  1141. }
  1142. func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
  1143. t.mu.Lock()
  1144. defer t.mu.Unlock()
  1145. if t.drainChan != nil {
  1146. return
  1147. }
  1148. t.drainChan = make(chan struct{})
  1149. t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
  1150. }
  1151. var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
  1152. func getJitter(v time.Duration) time.Duration {
  1153. if v == infinity {
  1154. return 0
  1155. }
  1156. // Generate a jitter between +/- 10% of the value.
  1157. r := int64(v / 10)
  1158. j := rgen.Int63n(2*r) - r
  1159. return time.Duration(j)
  1160. }