http2_server.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "math/rand"
  26. "net"
  27. "strconv"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/golang/protobuf/proto"
  32. "golang.org/x/net/context"
  33. "golang.org/x/net/http2"
  34. "golang.org/x/net/http2/hpack"
  35. "google.golang.org/grpc/codes"
  36. "google.golang.org/grpc/credentials"
  37. "google.golang.org/grpc/keepalive"
  38. "google.golang.org/grpc/metadata"
  39. "google.golang.org/grpc/peer"
  40. "google.golang.org/grpc/stats"
  41. "google.golang.org/grpc/status"
  42. "google.golang.org/grpc/tap"
  43. )
  44. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  45. // the stream's state.
  46. var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  47. // http2Server implements the ServerTransport interface with HTTP2.
  48. type http2Server struct {
  49. ctx context.Context
  50. cancel context.CancelFunc
  51. conn net.Conn
  52. remoteAddr net.Addr
  53. localAddr net.Addr
  54. maxStreamID uint32 // max stream ID ever seen
  55. authInfo credentials.AuthInfo // auth info about the connection
  56. inTapHandle tap.ServerInHandle
  57. framer *framer
  58. hBuf *bytes.Buffer // the buffer for HPACK encoding
  59. hEnc *hpack.Encoder // HPACK encoder
  60. // The max number of concurrent streams.
  61. maxStreams uint32
  62. // controlBuf delivers all the control related tasks (e.g., window
  63. // updates, reset streams, and various settings) to the controller.
  64. controlBuf *controlBuffer
  65. fc *inFlow
  66. // sendQuotaPool provides flow control to outbound message.
  67. sendQuotaPool *quotaPool
  68. // localSendQuota limits the amount of data that can be scheduled
  69. // for writing before it is actually written out.
  70. localSendQuota *quotaPool
  71. stats stats.Handler
  72. // Flag to keep track of reading activity on transport.
  73. // 1 is true and 0 is false.
  74. activity uint32 // Accessed atomically.
  75. // Keepalive and max-age parameters for the server.
  76. kp keepalive.ServerParameters
  77. // Keepalive enforcement policy.
  78. kep keepalive.EnforcementPolicy
  79. // The time instance last ping was received.
  80. lastPingAt time.Time
  81. // Number of times the client has violated keepalive ping policy so far.
  82. pingStrikes uint8
  83. // Flag to signify that number of ping strikes should be reset to 0.
  84. // This is set whenever data or header frames are sent.
  85. // 1 means yes.
  86. resetPingStrikes uint32 // Accessed atomically.
  87. initialWindowSize int32
  88. bdpEst *bdpEstimator
  89. mu sync.Mutex // guard the following
  90. // drainChan is initialized when drain(...) is called the first time.
  91. // After which the server writes out the first GoAway(with ID 2^31-1) frame.
  92. // Then an independent goroutine will be launched to later send the second GoAway.
  93. // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
  94. // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
  95. // already underway.
  96. drainChan chan struct{}
  97. state transportState
  98. activeStreams map[uint32]*Stream
  99. // the per-stream outbound flow control window size set by the peer.
  100. streamSendQuota uint32
  101. // idle is the time instant when the connection went idle.
  102. // This is either the beginning of the connection or when the number of
  103. // RPCs go down to 0.
  104. // When the connection is busy, this value is set to 0.
  105. idle time.Time
  106. }
  107. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  108. // returned if something goes wrong.
  109. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  110. writeBufSize := defaultWriteBufSize
  111. if config.WriteBufferSize > 0 {
  112. writeBufSize = config.WriteBufferSize
  113. }
  114. readBufSize := defaultReadBufSize
  115. if config.ReadBufferSize > 0 {
  116. readBufSize = config.ReadBufferSize
  117. }
  118. framer := newFramer(conn, writeBufSize, readBufSize)
  119. // Send initial settings as connection preface to client.
  120. var isettings []http2.Setting
  121. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  122. // permitted in the HTTP2 spec.
  123. maxStreams := config.MaxStreams
  124. if maxStreams == 0 {
  125. maxStreams = math.MaxUint32
  126. } else {
  127. isettings = append(isettings, http2.Setting{
  128. ID: http2.SettingMaxConcurrentStreams,
  129. Val: maxStreams,
  130. })
  131. }
  132. dynamicWindow := true
  133. iwz := int32(initialWindowSize)
  134. if config.InitialWindowSize >= defaultWindowSize {
  135. iwz = config.InitialWindowSize
  136. dynamicWindow = false
  137. }
  138. icwz := int32(initialWindowSize)
  139. if config.InitialConnWindowSize >= defaultWindowSize {
  140. icwz = config.InitialConnWindowSize
  141. dynamicWindow = false
  142. }
  143. if iwz != defaultWindowSize {
  144. isettings = append(isettings, http2.Setting{
  145. ID: http2.SettingInitialWindowSize,
  146. Val: uint32(iwz)})
  147. }
  148. if err := framer.fr.WriteSettings(isettings...); err != nil {
  149. return nil, connectionErrorf(false, err, "transport: %v", err)
  150. }
  151. // Adjust the connection flow control window if needed.
  152. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  153. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  154. return nil, connectionErrorf(false, err, "transport: %v", err)
  155. }
  156. }
  157. kp := config.KeepaliveParams
  158. if kp.MaxConnectionIdle == 0 {
  159. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  160. }
  161. if kp.MaxConnectionAge == 0 {
  162. kp.MaxConnectionAge = defaultMaxConnectionAge
  163. }
  164. // Add a jitter to MaxConnectionAge.
  165. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  166. if kp.MaxConnectionAgeGrace == 0 {
  167. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  168. }
  169. if kp.Time == 0 {
  170. kp.Time = defaultServerKeepaliveTime
  171. }
  172. if kp.Timeout == 0 {
  173. kp.Timeout = defaultServerKeepaliveTimeout
  174. }
  175. kep := config.KeepalivePolicy
  176. if kep.MinTime == 0 {
  177. kep.MinTime = defaultKeepalivePolicyMinTime
  178. }
  179. var buf bytes.Buffer
  180. ctx, cancel := context.WithCancel(context.Background())
  181. t := &http2Server{
  182. ctx: ctx,
  183. cancel: cancel,
  184. conn: conn,
  185. remoteAddr: conn.RemoteAddr(),
  186. localAddr: conn.LocalAddr(),
  187. authInfo: config.AuthInfo,
  188. framer: framer,
  189. hBuf: &buf,
  190. hEnc: hpack.NewEncoder(&buf),
  191. maxStreams: maxStreams,
  192. inTapHandle: config.InTapHandle,
  193. controlBuf: newControlBuffer(),
  194. fc: &inFlow{limit: uint32(icwz)},
  195. sendQuotaPool: newQuotaPool(defaultWindowSize),
  196. localSendQuota: newQuotaPool(defaultLocalSendQuota),
  197. state: reachable,
  198. activeStreams: make(map[uint32]*Stream),
  199. streamSendQuota: defaultWindowSize,
  200. stats: config.StatsHandler,
  201. kp: kp,
  202. idle: time.Now(),
  203. kep: kep,
  204. initialWindowSize: iwz,
  205. }
  206. if dynamicWindow {
  207. t.bdpEst = &bdpEstimator{
  208. bdp: initialWindowSize,
  209. updateFlowControl: t.updateFlowControl,
  210. }
  211. }
  212. if t.stats != nil {
  213. t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
  214. RemoteAddr: t.remoteAddr,
  215. LocalAddr: t.localAddr,
  216. })
  217. connBegin := &stats.ConnBegin{}
  218. t.stats.HandleConn(t.ctx, connBegin)
  219. }
  220. t.framer.writer.Flush()
  221. defer func() {
  222. if err != nil {
  223. t.Close()
  224. }
  225. }()
  226. // Check the validity of client preface.
  227. preface := make([]byte, len(clientPreface))
  228. if _, err := io.ReadFull(t.conn, preface); err != nil {
  229. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  230. }
  231. if !bytes.Equal(preface, clientPreface) {
  232. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  233. }
  234. frame, err := t.framer.fr.ReadFrame()
  235. if err == io.EOF || err == io.ErrUnexpectedEOF {
  236. return nil, err
  237. }
  238. if err != nil {
  239. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  240. }
  241. atomic.StoreUint32(&t.activity, 1)
  242. sf, ok := frame.(*http2.SettingsFrame)
  243. if !ok {
  244. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  245. }
  246. t.handleSettings(sf)
  247. go func() {
  248. loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
  249. t.conn.Close()
  250. }()
  251. go t.keepalive()
  252. return t, nil
  253. }
  254. // operateHeader takes action on the decoded headers.
  255. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
  256. streamID := frame.Header().StreamID
  257. var state decodeState
  258. for _, hf := range frame.Fields {
  259. if err := state.processHeaderField(hf); err != nil {
  260. if se, ok := err.(StreamError); ok {
  261. t.controlBuf.put(&resetStream{streamID, statusCodeConvTab[se.Code]})
  262. }
  263. return
  264. }
  265. }
  266. buf := newRecvBuffer()
  267. s := &Stream{
  268. id: streamID,
  269. st: t,
  270. buf: buf,
  271. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  272. recvCompress: state.encoding,
  273. method: state.method,
  274. contentSubtype: state.contentSubtype,
  275. }
  276. if frame.StreamEnded() {
  277. // s is just created by the caller. No lock needed.
  278. s.state = streamReadDone
  279. }
  280. if state.timeoutSet {
  281. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
  282. } else {
  283. s.ctx, s.cancel = context.WithCancel(t.ctx)
  284. }
  285. pr := &peer.Peer{
  286. Addr: t.remoteAddr,
  287. }
  288. // Attach Auth info if there is any.
  289. if t.authInfo != nil {
  290. pr.AuthInfo = t.authInfo
  291. }
  292. s.ctx = peer.NewContext(s.ctx, pr)
  293. // Attach the received metadata to the context.
  294. if len(state.mdata) > 0 {
  295. s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
  296. }
  297. if state.statsTags != nil {
  298. s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
  299. }
  300. if state.statsTrace != nil {
  301. s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
  302. }
  303. if t.inTapHandle != nil {
  304. var err error
  305. info := &tap.Info{
  306. FullMethodName: state.method,
  307. }
  308. s.ctx, err = t.inTapHandle(s.ctx, info)
  309. if err != nil {
  310. warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
  311. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
  312. return
  313. }
  314. }
  315. t.mu.Lock()
  316. if t.state != reachable {
  317. t.mu.Unlock()
  318. return
  319. }
  320. if uint32(len(t.activeStreams)) >= t.maxStreams {
  321. t.mu.Unlock()
  322. t.controlBuf.put(&resetStream{streamID, http2.ErrCodeRefusedStream})
  323. return
  324. }
  325. if streamID%2 != 1 || streamID <= t.maxStreamID {
  326. t.mu.Unlock()
  327. // illegal gRPC stream id.
  328. errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
  329. return true
  330. }
  331. t.maxStreamID = streamID
  332. s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
  333. t.activeStreams[streamID] = s
  334. if len(t.activeStreams) == 1 {
  335. t.idle = time.Time{}
  336. }
  337. t.mu.Unlock()
  338. s.requestRead = func(n int) {
  339. t.adjustWindow(s, uint32(n))
  340. }
  341. s.ctx = traceCtx(s.ctx, s.method)
  342. if t.stats != nil {
  343. s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  344. inHeader := &stats.InHeader{
  345. FullMethod: s.method,
  346. RemoteAddr: t.remoteAddr,
  347. LocalAddr: t.localAddr,
  348. Compression: s.recvCompress,
  349. WireLength: int(frame.Header().Length),
  350. }
  351. t.stats.HandleRPC(s.ctx, inHeader)
  352. }
  353. s.trReader = &transportReader{
  354. reader: &recvBufferReader{
  355. ctx: s.ctx,
  356. recv: s.buf,
  357. },
  358. windowHandler: func(n int) {
  359. t.updateWindow(s, uint32(n))
  360. },
  361. }
  362. s.waiters = waiters{
  363. ctx: s.ctx,
  364. tctx: t.ctx,
  365. }
  366. handle(s)
  367. return
  368. }
  369. // HandleStreams receives incoming streams using the given handler. This is
  370. // typically run in a separate goroutine.
  371. // traceCtx attaches trace to ctx and returns the new context.
  372. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  373. for {
  374. frame, err := t.framer.fr.ReadFrame()
  375. atomic.StoreUint32(&t.activity, 1)
  376. if err != nil {
  377. if se, ok := err.(http2.StreamError); ok {
  378. t.mu.Lock()
  379. s := t.activeStreams[se.StreamID]
  380. t.mu.Unlock()
  381. if s != nil {
  382. t.closeStream(s)
  383. }
  384. t.controlBuf.put(&resetStream{se.StreamID, se.Code})
  385. continue
  386. }
  387. if err == io.EOF || err == io.ErrUnexpectedEOF {
  388. t.Close()
  389. return
  390. }
  391. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  392. t.Close()
  393. return
  394. }
  395. switch frame := frame.(type) {
  396. case *http2.MetaHeadersFrame:
  397. if t.operateHeaders(frame, handle, traceCtx) {
  398. t.Close()
  399. break
  400. }
  401. case *http2.DataFrame:
  402. t.handleData(frame)
  403. case *http2.RSTStreamFrame:
  404. t.handleRSTStream(frame)
  405. case *http2.SettingsFrame:
  406. t.handleSettings(frame)
  407. case *http2.PingFrame:
  408. t.handlePing(frame)
  409. case *http2.WindowUpdateFrame:
  410. t.handleWindowUpdate(frame)
  411. case *http2.GoAwayFrame:
  412. // TODO: Handle GoAway from the client appropriately.
  413. default:
  414. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  415. }
  416. }
  417. }
  418. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  419. t.mu.Lock()
  420. defer t.mu.Unlock()
  421. if t.activeStreams == nil {
  422. // The transport is closing.
  423. return nil, false
  424. }
  425. s, ok := t.activeStreams[f.Header().StreamID]
  426. if !ok {
  427. // The stream is already done.
  428. return nil, false
  429. }
  430. return s, true
  431. }
  432. // adjustWindow sends out extra window update over the initial window size
  433. // of stream if the application is requesting data larger in size than
  434. // the window.
  435. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  436. s.mu.Lock()
  437. defer s.mu.Unlock()
  438. if s.state == streamDone {
  439. return
  440. }
  441. if w := s.fc.maybeAdjust(n); w > 0 {
  442. if cw := t.fc.resetPendingUpdate(); cw > 0 {
  443. t.controlBuf.put(&windowUpdate{0, cw})
  444. }
  445. t.controlBuf.put(&windowUpdate{s.id, w})
  446. }
  447. }
  448. // updateWindow adjusts the inbound quota for the stream and the transport.
  449. // Window updates will deliver to the controller for sending when
  450. // the cumulative quota exceeds the corresponding threshold.
  451. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  452. s.mu.Lock()
  453. defer s.mu.Unlock()
  454. if s.state == streamDone {
  455. return
  456. }
  457. if w := s.fc.onRead(n); w > 0 {
  458. if cw := t.fc.resetPendingUpdate(); cw > 0 {
  459. t.controlBuf.put(&windowUpdate{0, cw})
  460. }
  461. t.controlBuf.put(&windowUpdate{s.id, w})
  462. }
  463. }
  464. // updateFlowControl updates the incoming flow control windows
  465. // for the transport and the stream based on the current bdp
  466. // estimation.
  467. func (t *http2Server) updateFlowControl(n uint32) {
  468. t.mu.Lock()
  469. for _, s := range t.activeStreams {
  470. s.fc.newLimit(n)
  471. }
  472. t.initialWindowSize = int32(n)
  473. t.mu.Unlock()
  474. t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
  475. t.controlBuf.put(&settings{
  476. ss: []http2.Setting{
  477. {
  478. ID: http2.SettingInitialWindowSize,
  479. Val: uint32(n),
  480. },
  481. },
  482. })
  483. }
  484. func (t *http2Server) handleData(f *http2.DataFrame) {
  485. size := f.Header().Length
  486. var sendBDPPing bool
  487. if t.bdpEst != nil {
  488. sendBDPPing = t.bdpEst.add(uint32(size))
  489. }
  490. // Decouple connection's flow control from application's read.
  491. // An update on connection's flow control should not depend on
  492. // whether user application has read the data or not. Such a
  493. // restriction is already imposed on the stream's flow control,
  494. // and therefore the sender will be blocked anyways.
  495. // Decoupling the connection flow control will prevent other
  496. // active(fast) streams from starving in presence of slow or
  497. // inactive streams.
  498. //
  499. // Furthermore, if a bdpPing is being sent out we can piggyback
  500. // connection's window update for the bytes we just received.
  501. if sendBDPPing {
  502. if size != 0 { // Could be an empty frame.
  503. t.controlBuf.put(&windowUpdate{0, uint32(size)})
  504. }
  505. t.controlBuf.put(bdpPing)
  506. } else {
  507. if err := t.fc.onData(uint32(size)); err != nil {
  508. errorf("transport: http2Server %v", err)
  509. t.Close()
  510. return
  511. }
  512. if w := t.fc.onRead(uint32(size)); w > 0 {
  513. t.controlBuf.put(&windowUpdate{0, w})
  514. }
  515. }
  516. // Select the right stream to dispatch.
  517. s, ok := t.getStream(f)
  518. if !ok {
  519. return
  520. }
  521. if size > 0 {
  522. s.mu.Lock()
  523. if s.state == streamDone {
  524. s.mu.Unlock()
  525. return
  526. }
  527. if err := s.fc.onData(uint32(size)); err != nil {
  528. s.mu.Unlock()
  529. t.closeStream(s)
  530. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
  531. return
  532. }
  533. if f.Header().Flags.Has(http2.FlagDataPadded) {
  534. if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
  535. t.controlBuf.put(&windowUpdate{s.id, w})
  536. }
  537. }
  538. s.mu.Unlock()
  539. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  540. // guarantee f.Data() is consumed before the arrival of next frame.
  541. // Can this copy be eliminated?
  542. if len(f.Data()) > 0 {
  543. data := make([]byte, len(f.Data()))
  544. copy(data, f.Data())
  545. s.write(recvMsg{data: data})
  546. }
  547. }
  548. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  549. // Received the end of stream from the client.
  550. s.mu.Lock()
  551. if s.state != streamDone {
  552. s.state = streamReadDone
  553. }
  554. s.mu.Unlock()
  555. s.write(recvMsg{err: io.EOF})
  556. }
  557. }
  558. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  559. s, ok := t.getStream(f)
  560. if !ok {
  561. return
  562. }
  563. t.closeStream(s)
  564. }
  565. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  566. if f.IsAck() {
  567. return
  568. }
  569. var rs []http2.Setting
  570. var ps []http2.Setting
  571. f.ForeachSetting(func(s http2.Setting) error {
  572. if t.isRestrictive(s) {
  573. rs = append(rs, s)
  574. } else {
  575. ps = append(ps, s)
  576. }
  577. return nil
  578. })
  579. t.applySettings(rs)
  580. t.controlBuf.put(&settingsAck{})
  581. t.applySettings(ps)
  582. }
  583. func (t *http2Server) isRestrictive(s http2.Setting) bool {
  584. switch s.ID {
  585. case http2.SettingInitialWindowSize:
  586. // Note: we don't acquire a lock here to read streamSendQuota
  587. // because the same goroutine updates it later.
  588. return s.Val < t.streamSendQuota
  589. }
  590. return false
  591. }
  592. func (t *http2Server) applySettings(ss []http2.Setting) {
  593. for _, s := range ss {
  594. if s.ID == http2.SettingInitialWindowSize {
  595. t.mu.Lock()
  596. for _, stream := range t.activeStreams {
  597. stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
  598. }
  599. t.streamSendQuota = s.Val
  600. t.mu.Unlock()
  601. }
  602. }
  603. }
  604. const (
  605. maxPingStrikes = 2
  606. defaultPingTimeout = 2 * time.Hour
  607. )
  608. func (t *http2Server) handlePing(f *http2.PingFrame) {
  609. if f.IsAck() {
  610. if f.Data == goAwayPing.data && t.drainChan != nil {
  611. close(t.drainChan)
  612. return
  613. }
  614. // Maybe it's a BDP ping.
  615. if t.bdpEst != nil {
  616. t.bdpEst.calculate(f.Data)
  617. }
  618. return
  619. }
  620. pingAck := &ping{ack: true}
  621. copy(pingAck.data[:], f.Data[:])
  622. t.controlBuf.put(pingAck)
  623. now := time.Now()
  624. defer func() {
  625. t.lastPingAt = now
  626. }()
  627. // A reset ping strikes means that we don't need to check for policy
  628. // violation for this ping and the pingStrikes counter should be set
  629. // to 0.
  630. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  631. t.pingStrikes = 0
  632. return
  633. }
  634. t.mu.Lock()
  635. ns := len(t.activeStreams)
  636. t.mu.Unlock()
  637. if ns < 1 && !t.kep.PermitWithoutStream {
  638. // Keepalive shouldn't be active thus, this new ping should
  639. // have come after at least defaultPingTimeout.
  640. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  641. t.pingStrikes++
  642. }
  643. } else {
  644. // Check if keepalive policy is respected.
  645. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  646. t.pingStrikes++
  647. }
  648. }
  649. if t.pingStrikes > maxPingStrikes {
  650. // Send goaway and close the connection.
  651. errorf("transport: Got too many pings from the client, closing the connection.")
  652. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
  653. }
  654. }
  655. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  656. id := f.Header().StreamID
  657. incr := f.Increment
  658. if id == 0 {
  659. t.sendQuotaPool.add(int(incr))
  660. return
  661. }
  662. if s, ok := t.getStream(f); ok {
  663. s.sendQuotaPool.add(int(incr))
  664. }
  665. }
  666. // WriteHeader sends the header metedata md back to the client.
  667. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  668. select {
  669. case <-s.ctx.Done():
  670. return ContextErr(s.ctx.Err())
  671. case <-t.ctx.Done():
  672. return ErrConnClosing
  673. default:
  674. }
  675. s.mu.Lock()
  676. if s.headerOk || s.state == streamDone {
  677. s.mu.Unlock()
  678. return ErrIllegalHeaderWrite
  679. }
  680. s.headerOk = true
  681. if md.Len() > 0 {
  682. if s.header.Len() > 0 {
  683. s.header = metadata.Join(s.header, md)
  684. } else {
  685. s.header = md
  686. }
  687. }
  688. md = s.header
  689. s.mu.Unlock()
  690. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  691. // first and create a slice of that exact size.
  692. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  693. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  694. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  695. if s.sendCompress != "" {
  696. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  697. }
  698. for k, vv := range md {
  699. if isReservedHeader(k) {
  700. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  701. continue
  702. }
  703. for _, v := range vv {
  704. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  705. }
  706. }
  707. t.controlBuf.put(&headerFrame{
  708. streamID: s.id,
  709. hf: headerFields,
  710. endStream: false,
  711. })
  712. if t.stats != nil {
  713. // Note: WireLength is not set in outHeader.
  714. // TODO(mmukhi): Revisit this later, if needed.
  715. outHeader := &stats.OutHeader{}
  716. t.stats.HandleRPC(s.Context(), outHeader)
  717. }
  718. return nil
  719. }
  720. // WriteStatus sends stream status to the client and terminates the stream.
  721. // There is no further I/O operations being able to perform on this stream.
  722. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  723. // OK is adopted.
  724. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  725. select {
  726. case <-t.ctx.Done():
  727. return ErrConnClosing
  728. default:
  729. }
  730. var headersSent, hasHeader bool
  731. s.mu.Lock()
  732. if s.state == streamDone {
  733. s.mu.Unlock()
  734. return nil
  735. }
  736. if s.headerOk {
  737. headersSent = true
  738. }
  739. if s.header.Len() > 0 {
  740. hasHeader = true
  741. }
  742. s.mu.Unlock()
  743. if !headersSent && hasHeader {
  744. t.WriteHeader(s, nil)
  745. headersSent = true
  746. }
  747. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  748. // first and create a slice of that exact size.
  749. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  750. if !headersSent {
  751. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  752. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  753. }
  754. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  755. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  756. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  757. stBytes, err := proto.Marshal(p)
  758. if err != nil {
  759. // TODO: return error instead, when callers are able to handle it.
  760. panic(err)
  761. }
  762. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  763. }
  764. // Attach the trailer metadata.
  765. for k, vv := range s.trailer {
  766. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  767. if isReservedHeader(k) {
  768. continue
  769. }
  770. for _, v := range vv {
  771. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  772. }
  773. }
  774. t.controlBuf.put(&headerFrame{
  775. streamID: s.id,
  776. hf: headerFields,
  777. endStream: true,
  778. })
  779. if t.stats != nil {
  780. t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
  781. }
  782. t.closeStream(s)
  783. return nil
  784. }
  785. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  786. // is returns if it fails (e.g., framing error, transport error).
  787. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  788. select {
  789. case <-s.ctx.Done():
  790. return ContextErr(s.ctx.Err())
  791. case <-t.ctx.Done():
  792. return ErrConnClosing
  793. default:
  794. }
  795. var writeHeaderFrame bool
  796. s.mu.Lock()
  797. if !s.headerOk {
  798. writeHeaderFrame = true
  799. }
  800. s.mu.Unlock()
  801. if writeHeaderFrame {
  802. t.WriteHeader(s, nil)
  803. }
  804. // Add data to header frame so that we can equally distribute data across frames.
  805. emptyLen := http2MaxFrameLen - len(hdr)
  806. if emptyLen > len(data) {
  807. emptyLen = len(data)
  808. }
  809. hdr = append(hdr, data[:emptyLen]...)
  810. data = data[emptyLen:]
  811. var (
  812. streamQuota int
  813. streamQuotaVer uint32
  814. err error
  815. )
  816. for _, r := range [][]byte{hdr, data} {
  817. for len(r) > 0 {
  818. size := http2MaxFrameLen
  819. if size > len(r) {
  820. size = len(r)
  821. }
  822. if streamQuota == 0 { // Used up all the locally cached stream quota.
  823. // Get all the stream quota there is.
  824. streamQuota, streamQuotaVer, err = s.sendQuotaPool.get(math.MaxInt32, s.waiters)
  825. if err != nil {
  826. return err
  827. }
  828. }
  829. if size > streamQuota {
  830. size = streamQuota
  831. }
  832. // Get size worth quota from transport.
  833. tq, _, err := t.sendQuotaPool.get(size, s.waiters)
  834. if err != nil {
  835. return err
  836. }
  837. if tq < size {
  838. size = tq
  839. }
  840. ltq, _, err := t.localSendQuota.get(size, s.waiters)
  841. if err != nil {
  842. // Add the acquired quota back to transport.
  843. t.sendQuotaPool.add(tq)
  844. return err
  845. }
  846. // even if ltq is smaller than size we don't adjust size since,
  847. // ltq is only a soft limit.
  848. streamQuota -= size
  849. p := r[:size]
  850. success := func() {
  851. ltq := ltq
  852. t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
  853. t.localSendQuota.add(ltq)
  854. }})
  855. r = r[size:]
  856. }
  857. failure := func() { // The stream quota version must have changed.
  858. // Our streamQuota cache is invalidated now, so give it back.
  859. s.sendQuotaPool.lockedAdd(streamQuota + size)
  860. }
  861. if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
  862. // Couldn't send this chunk out.
  863. t.sendQuotaPool.add(size)
  864. t.localSendQuota.add(ltq)
  865. streamQuota = 0
  866. }
  867. }
  868. }
  869. if streamQuota > 0 {
  870. // ADd the left over quota back to stream.
  871. s.sendQuotaPool.add(streamQuota)
  872. }
  873. return nil
  874. }
  875. // keepalive running in a separate goroutine does the following:
  876. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  877. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  878. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  879. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  880. // after an additional duration of keepalive.Timeout.
  881. func (t *http2Server) keepalive() {
  882. p := &ping{}
  883. var pingSent bool
  884. maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
  885. maxAge := time.NewTimer(t.kp.MaxConnectionAge)
  886. keepalive := time.NewTimer(t.kp.Time)
  887. // NOTE: All exit paths of this function should reset their
  888. // respective timers. A failure to do so will cause the
  889. // following clean-up to deadlock and eventually leak.
  890. defer func() {
  891. if !maxIdle.Stop() {
  892. <-maxIdle.C
  893. }
  894. if !maxAge.Stop() {
  895. <-maxAge.C
  896. }
  897. if !keepalive.Stop() {
  898. <-keepalive.C
  899. }
  900. }()
  901. for {
  902. select {
  903. case <-maxIdle.C:
  904. t.mu.Lock()
  905. idle := t.idle
  906. if idle.IsZero() { // The connection is non-idle.
  907. t.mu.Unlock()
  908. maxIdle.Reset(t.kp.MaxConnectionIdle)
  909. continue
  910. }
  911. val := t.kp.MaxConnectionIdle - time.Since(idle)
  912. t.mu.Unlock()
  913. if val <= 0 {
  914. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  915. // Gracefully close the connection.
  916. t.drain(http2.ErrCodeNo, []byte{})
  917. // Reseting the timer so that the clean-up doesn't deadlock.
  918. maxIdle.Reset(infinity)
  919. return
  920. }
  921. maxIdle.Reset(val)
  922. case <-maxAge.C:
  923. t.drain(http2.ErrCodeNo, []byte{})
  924. maxAge.Reset(t.kp.MaxConnectionAgeGrace)
  925. select {
  926. case <-maxAge.C:
  927. // Close the connection after grace period.
  928. t.Close()
  929. // Reseting the timer so that the clean-up doesn't deadlock.
  930. maxAge.Reset(infinity)
  931. case <-t.ctx.Done():
  932. }
  933. return
  934. case <-keepalive.C:
  935. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  936. pingSent = false
  937. keepalive.Reset(t.kp.Time)
  938. continue
  939. }
  940. if pingSent {
  941. t.Close()
  942. // Reseting the timer so that the clean-up doesn't deadlock.
  943. keepalive.Reset(infinity)
  944. return
  945. }
  946. pingSent = true
  947. t.controlBuf.put(p)
  948. keepalive.Reset(t.kp.Timeout)
  949. case <-t.ctx.Done():
  950. return
  951. }
  952. }
  953. }
  954. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  955. // TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
  956. // is duplicated between the client and the server.
  957. // The transport layer needs to be refactored to take care of this.
  958. func (t *http2Server) itemHandler(i item) error {
  959. switch i := i.(type) {
  960. case *dataFrame:
  961. // Reset ping strikes when sending data since this might cause
  962. // the peer to send ping.
  963. atomic.StoreUint32(&t.resetPingStrikes, 1)
  964. if err := t.framer.fr.WriteData(i.streamID, i.endStream, i.d); err != nil {
  965. return err
  966. }
  967. i.f()
  968. return nil
  969. case *headerFrame:
  970. t.hBuf.Reset()
  971. for _, f := range i.hf {
  972. t.hEnc.WriteField(f)
  973. }
  974. first := true
  975. endHeaders := false
  976. for !endHeaders {
  977. size := t.hBuf.Len()
  978. if size > http2MaxFrameLen {
  979. size = http2MaxFrameLen
  980. } else {
  981. endHeaders = true
  982. }
  983. var err error
  984. if first {
  985. first = false
  986. err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  987. StreamID: i.streamID,
  988. BlockFragment: t.hBuf.Next(size),
  989. EndStream: i.endStream,
  990. EndHeaders: endHeaders,
  991. })
  992. } else {
  993. err = t.framer.fr.WriteContinuation(
  994. i.streamID,
  995. endHeaders,
  996. t.hBuf.Next(size),
  997. )
  998. }
  999. if err != nil {
  1000. return err
  1001. }
  1002. }
  1003. atomic.StoreUint32(&t.resetPingStrikes, 1)
  1004. return nil
  1005. case *windowUpdate:
  1006. return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
  1007. case *settings:
  1008. return t.framer.fr.WriteSettings(i.ss...)
  1009. case *settingsAck:
  1010. return t.framer.fr.WriteSettingsAck()
  1011. case *resetStream:
  1012. return t.framer.fr.WriteRSTStream(i.streamID, i.code)
  1013. case *goAway:
  1014. t.mu.Lock()
  1015. if t.state == closing {
  1016. t.mu.Unlock()
  1017. // The transport is closing.
  1018. return fmt.Errorf("transport: Connection closing")
  1019. }
  1020. sid := t.maxStreamID
  1021. if !i.headsUp {
  1022. // Stop accepting more streams now.
  1023. t.state = draining
  1024. if len(t.activeStreams) == 0 {
  1025. i.closeConn = true
  1026. }
  1027. t.mu.Unlock()
  1028. if err := t.framer.fr.WriteGoAway(sid, i.code, i.debugData); err != nil {
  1029. return err
  1030. }
  1031. if i.closeConn {
  1032. // Abruptly close the connection following the GoAway (via
  1033. // loopywriter). But flush out what's inside the buffer first.
  1034. t.controlBuf.put(&flushIO{closeTr: true})
  1035. }
  1036. return nil
  1037. }
  1038. t.mu.Unlock()
  1039. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1040. // Follow that with a ping and wait for the ack to come back or a timer
  1041. // to expire. During this time accept new streams since they might have
  1042. // originated before the GoAway reaches the client.
  1043. // After getting the ack or timer expiration send out another GoAway this
  1044. // time with an ID of the max stream server intends to process.
  1045. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
  1046. return err
  1047. }
  1048. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1049. return err
  1050. }
  1051. go func() {
  1052. timer := time.NewTimer(time.Minute)
  1053. defer timer.Stop()
  1054. select {
  1055. case <-t.drainChan:
  1056. case <-timer.C:
  1057. case <-t.ctx.Done():
  1058. return
  1059. }
  1060. t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
  1061. }()
  1062. return nil
  1063. case *flushIO:
  1064. if err := t.framer.writer.Flush(); err != nil {
  1065. return err
  1066. }
  1067. if i.closeTr {
  1068. return ErrConnClosing
  1069. }
  1070. return nil
  1071. case *ping:
  1072. if !i.ack {
  1073. t.bdpEst.timesnap(i.data)
  1074. }
  1075. return t.framer.fr.WritePing(i.ack, i.data)
  1076. default:
  1077. err := status.Errorf(codes.Internal, "transport: http2Server.controller got unexpected item type %t", i)
  1078. errorf("%v", err)
  1079. return err
  1080. }
  1081. }
  1082. // Close starts shutting down the http2Server transport.
  1083. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  1084. // could cause some resource issue. Revisit this later.
  1085. func (t *http2Server) Close() error {
  1086. t.mu.Lock()
  1087. if t.state == closing {
  1088. t.mu.Unlock()
  1089. return errors.New("transport: Close() was already called")
  1090. }
  1091. t.state = closing
  1092. streams := t.activeStreams
  1093. t.activeStreams = nil
  1094. t.mu.Unlock()
  1095. t.cancel()
  1096. err := t.conn.Close()
  1097. // Cancel all active streams.
  1098. for _, s := range streams {
  1099. s.cancel()
  1100. }
  1101. if t.stats != nil {
  1102. connEnd := &stats.ConnEnd{}
  1103. t.stats.HandleConn(t.ctx, connEnd)
  1104. }
  1105. return err
  1106. }
  1107. // closeStream clears the footprint of a stream when the stream is not needed
  1108. // any more.
  1109. func (t *http2Server) closeStream(s *Stream) {
  1110. t.mu.Lock()
  1111. delete(t.activeStreams, s.id)
  1112. if len(t.activeStreams) == 0 {
  1113. t.idle = time.Now()
  1114. }
  1115. if t.state == draining && len(t.activeStreams) == 0 {
  1116. defer t.controlBuf.put(&flushIO{closeTr: true})
  1117. }
  1118. t.mu.Unlock()
  1119. // In case stream sending and receiving are invoked in separate
  1120. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1121. // called to interrupt the potential blocking on other goroutines.
  1122. s.cancel()
  1123. s.mu.Lock()
  1124. if s.state == streamDone {
  1125. s.mu.Unlock()
  1126. return
  1127. }
  1128. s.state = streamDone
  1129. s.mu.Unlock()
  1130. }
  1131. func (t *http2Server) RemoteAddr() net.Addr {
  1132. return t.remoteAddr
  1133. }
  1134. func (t *http2Server) Drain() {
  1135. t.drain(http2.ErrCodeNo, []byte{})
  1136. }
  1137. func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
  1138. t.mu.Lock()
  1139. defer t.mu.Unlock()
  1140. if t.drainChan != nil {
  1141. return
  1142. }
  1143. t.drainChan = make(chan struct{})
  1144. t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
  1145. }
  1146. var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
  1147. func getJitter(v time.Duration) time.Duration {
  1148. if v == infinity {
  1149. return 0
  1150. }
  1151. // Generate a jitter between +/- 10% of the value.
  1152. r := int64(v / 10)
  1153. j := rgen.Int63n(2*r) - r
  1154. return time.Duration(j)
  1155. }