stream.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "io"
  22. "sync"
  23. "time"
  24. "golang.org/x/net/context"
  25. "golang.org/x/net/trace"
  26. "google.golang.org/grpc/balancer"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/encoding"
  29. "google.golang.org/grpc/metadata"
  30. "google.golang.org/grpc/peer"
  31. "google.golang.org/grpc/stats"
  32. "google.golang.org/grpc/status"
  33. "google.golang.org/grpc/transport"
  34. )
  35. // StreamHandler defines the handler called by gRPC server to complete the
  36. // execution of a streaming RPC.
  37. type StreamHandler func(srv interface{}, stream ServerStream) error
  38. // StreamDesc represents a streaming RPC service's method specification.
  39. type StreamDesc struct {
  40. StreamName string
  41. Handler StreamHandler
  42. // At least one of these is true.
  43. ServerStreams bool
  44. ClientStreams bool
  45. }
  46. // Stream defines the common interface a client or server stream has to satisfy.
  47. type Stream interface {
  48. // Context returns the context for this stream.
  49. Context() context.Context
  50. // SendMsg blocks until it sends m, the stream is done or the stream
  51. // breaks.
  52. // On error, it aborts the stream and returns an RPC status on client
  53. // side. On server side, it simply returns the error to the caller.
  54. // SendMsg is called by generated code. Also Users can call SendMsg
  55. // directly when it is really needed in their use cases.
  56. // It's safe to have a goroutine calling SendMsg and another goroutine calling
  57. // recvMsg on the same stream at the same time.
  58. // But it is not safe to call SendMsg on the same stream in different goroutines.
  59. SendMsg(m interface{}) error
  60. // RecvMsg blocks until it receives a message or the stream is
  61. // done. On client side, it returns io.EOF when the stream is done. On
  62. // any other error, it aborts the stream and returns an RPC status. On
  63. // server side, it simply returns the error to the caller.
  64. // It's safe to have a goroutine calling SendMsg and another goroutine calling
  65. // recvMsg on the same stream at the same time.
  66. // But it is not safe to call RecvMsg on the same stream in different goroutines.
  67. RecvMsg(m interface{}) error
  68. }
  69. // ClientStream defines the interface a client stream has to satisfy.
  70. type ClientStream interface {
  71. // Header returns the header metadata received from the server if there
  72. // is any. It blocks if the metadata is not ready to read.
  73. Header() (metadata.MD, error)
  74. // Trailer returns the trailer metadata from the server, if there is any.
  75. // It must only be called after stream.CloseAndRecv has returned, or
  76. // stream.Recv has returned a non-nil error (including io.EOF).
  77. Trailer() metadata.MD
  78. // CloseSend closes the send direction of the stream. It closes the stream
  79. // when non-nil error is met.
  80. CloseSend() error
  81. // Stream.SendMsg() may return a non-nil error when something wrong happens sending
  82. // the request. The returned error indicates the status of this sending, not the final
  83. // status of the RPC.
  84. // Always call Stream.RecvMsg() to get the final status if you care about the status of
  85. // the RPC.
  86. Stream
  87. }
  88. // NewStream creates a new Stream for the client side. This is typically
  89. // called by generated code.
  90. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
  91. if cc.dopts.streamInt != nil {
  92. return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
  93. }
  94. return newClientStream(ctx, desc, cc, method, opts...)
  95. }
  96. // NewClientStream creates a new Stream for the client side. This is typically
  97. // called by generated code.
  98. //
  99. // DEPRECATED: Use ClientConn.NewStream instead.
  100. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
  101. return cc.NewStream(ctx, desc, method, opts...)
  102. }
  103. func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
  104. var (
  105. t transport.ClientTransport
  106. s *transport.Stream
  107. done func(balancer.DoneInfo)
  108. cancel context.CancelFunc
  109. )
  110. c := defaultCallInfo()
  111. mc := cc.GetMethodConfig(method)
  112. if mc.WaitForReady != nil {
  113. c.failFast = !*mc.WaitForReady
  114. }
  115. if mc.Timeout != nil && *mc.Timeout >= 0 {
  116. ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
  117. defer func() {
  118. if err != nil {
  119. cancel()
  120. }
  121. }()
  122. }
  123. opts = append(cc.dopts.callOptions, opts...)
  124. for _, o := range opts {
  125. if err := o.before(c); err != nil {
  126. return nil, toRPCErr(err)
  127. }
  128. }
  129. c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
  130. c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
  131. callHdr := &transport.CallHdr{
  132. Host: cc.authority,
  133. Method: method,
  134. // If it's not client streaming, we should already have the request to be sent,
  135. // so we don't flush the header.
  136. // If it's client streaming, the user may never send a request or send it any
  137. // time soon, so we ask the transport to flush the header.
  138. Flush: desc.ClientStreams,
  139. }
  140. // Set our outgoing compression according to the UseCompressor CallOption, if
  141. // set. In that case, also find the compressor from the encoding package.
  142. // Otherwise, use the compressor configured by the WithCompressor DialOption,
  143. // if set.
  144. var cp Compressor
  145. var comp encoding.Compressor
  146. if ct := c.compressorType; ct != "" {
  147. callHdr.SendCompress = ct
  148. if ct != encoding.Identity {
  149. comp = encoding.GetCompressor(ct)
  150. if comp == nil {
  151. return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
  152. }
  153. }
  154. } else if cc.dopts.cp != nil {
  155. callHdr.SendCompress = cc.dopts.cp.Type()
  156. cp = cc.dopts.cp
  157. }
  158. if c.creds != nil {
  159. callHdr.Creds = c.creds
  160. }
  161. var trInfo traceInfo
  162. if EnableTracing {
  163. trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
  164. trInfo.firstLine.client = true
  165. if deadline, ok := ctx.Deadline(); ok {
  166. trInfo.firstLine.deadline = deadline.Sub(time.Now())
  167. }
  168. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  169. ctx = trace.NewContext(ctx, trInfo.tr)
  170. defer func() {
  171. if err != nil {
  172. // Need to call tr.finish() if error is returned.
  173. // Because tr will not be returned to caller.
  174. trInfo.tr.LazyPrintf("RPC: [%v]", err)
  175. trInfo.tr.SetError()
  176. trInfo.tr.Finish()
  177. }
  178. }()
  179. }
  180. ctx = newContextWithRPCInfo(ctx, c.failFast)
  181. sh := cc.dopts.copts.StatsHandler
  182. if sh != nil {
  183. ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
  184. begin := &stats.Begin{
  185. Client: true,
  186. BeginTime: time.Now(),
  187. FailFast: c.failFast,
  188. }
  189. sh.HandleRPC(ctx, begin)
  190. defer func() {
  191. if err != nil {
  192. // Only handle end stats if err != nil.
  193. end := &stats.End{
  194. Client: true,
  195. Error: err,
  196. }
  197. sh.HandleRPC(ctx, end)
  198. }
  199. }()
  200. }
  201. for {
  202. // Check to make sure the context has expired. This will prevent us from
  203. // looping forever if an error occurs for wait-for-ready RPCs where no data
  204. // is sent on the wire.
  205. select {
  206. case <-ctx.Done():
  207. return nil, toRPCErr(ctx.Err())
  208. default:
  209. }
  210. t, done, err = cc.getTransport(ctx, c.failFast)
  211. if err != nil {
  212. return nil, err
  213. }
  214. s, err = t.NewStream(ctx, callHdr)
  215. if err != nil {
  216. if done != nil {
  217. doneInfo := balancer.DoneInfo{Err: err}
  218. if _, ok := err.(transport.ConnectionError); ok {
  219. // If error is connection error, transport was sending data on wire,
  220. // and we are not sure if anything has been sent on wire.
  221. // If error is not connection error, we are sure nothing has been sent.
  222. doneInfo.BytesSent = true
  223. }
  224. done(doneInfo)
  225. done = nil
  226. }
  227. // In the event of any error from NewStream, we never attempted to write
  228. // anything to the wire, so we can retry indefinitely for non-fail-fast
  229. // RPCs.
  230. if !c.failFast {
  231. continue
  232. }
  233. return nil, toRPCErr(err)
  234. }
  235. break
  236. }
  237. // Set callInfo.peer object from stream's context.
  238. if peer, ok := peer.FromContext(s.Context()); ok {
  239. c.peer = peer
  240. }
  241. cs := &clientStream{
  242. opts: opts,
  243. c: c,
  244. desc: desc,
  245. codec: cc.dopts.codec,
  246. cp: cp,
  247. dc: cc.dopts.dc,
  248. comp: comp,
  249. cancel: cancel,
  250. done: done,
  251. t: t,
  252. s: s,
  253. p: &parser{r: s},
  254. tracing: EnableTracing,
  255. trInfo: trInfo,
  256. statsCtx: ctx,
  257. statsHandler: cc.dopts.copts.StatsHandler,
  258. }
  259. // Listen on s.Context().Done() to detect cancellation and s.Done() to detect
  260. // normal termination when there is no pending I/O operations on this stream.
  261. go func() {
  262. select {
  263. case <-t.Error():
  264. // Incur transport error, simply exit.
  265. case <-cc.ctx.Done():
  266. cs.finish(ErrClientConnClosing)
  267. cs.closeTransportStream(ErrClientConnClosing)
  268. case <-s.Done():
  269. // TODO: The trace of the RPC is terminated here when there is no pending
  270. // I/O, which is probably not the optimal solution.
  271. cs.finish(s.Status().Err())
  272. cs.closeTransportStream(nil)
  273. case <-s.GoAway():
  274. cs.finish(errConnDrain)
  275. cs.closeTransportStream(errConnDrain)
  276. case <-s.Context().Done():
  277. err := s.Context().Err()
  278. cs.finish(err)
  279. cs.closeTransportStream(transport.ContextErr(err))
  280. }
  281. }()
  282. return cs, nil
  283. }
  284. // clientStream implements a client side Stream.
  285. type clientStream struct {
  286. opts []CallOption
  287. c *callInfo
  288. t transport.ClientTransport
  289. s *transport.Stream
  290. p *parser
  291. desc *StreamDesc
  292. codec Codec
  293. cp Compressor
  294. dc Decompressor
  295. comp encoding.Compressor
  296. decomp encoding.Compressor
  297. decompSet bool
  298. cancel context.CancelFunc
  299. tracing bool // set to EnableTracing when the clientStream is created.
  300. mu sync.Mutex
  301. done func(balancer.DoneInfo)
  302. closed bool
  303. finished bool
  304. // trInfo.tr is set when the clientStream is created (if EnableTracing is true),
  305. // and is set to nil when the clientStream's finish method is called.
  306. trInfo traceInfo
  307. // statsCtx keeps the user context for stats handling.
  308. // All stats collection should use the statsCtx (instead of the stream context)
  309. // so that all the generated stats for a particular RPC can be associated in the processing phase.
  310. statsCtx context.Context
  311. statsHandler stats.Handler
  312. }
  313. func (cs *clientStream) Context() context.Context {
  314. return cs.s.Context()
  315. }
  316. func (cs *clientStream) Header() (metadata.MD, error) {
  317. m, err := cs.s.Header()
  318. if err != nil {
  319. if _, ok := err.(transport.ConnectionError); !ok {
  320. cs.closeTransportStream(err)
  321. }
  322. }
  323. return m, err
  324. }
  325. func (cs *clientStream) Trailer() metadata.MD {
  326. return cs.s.Trailer()
  327. }
  328. func (cs *clientStream) SendMsg(m interface{}) (err error) {
  329. if cs.tracing {
  330. cs.mu.Lock()
  331. if cs.trInfo.tr != nil {
  332. cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
  333. }
  334. cs.mu.Unlock()
  335. }
  336. // TODO Investigate how to signal the stats handling party.
  337. // generate error stats if err != nil && err != io.EOF?
  338. defer func() {
  339. if err != nil {
  340. cs.finish(err)
  341. }
  342. if err == nil {
  343. return
  344. }
  345. if err == io.EOF {
  346. // Specialize the process for server streaming. SendMsg is only called
  347. // once when creating the stream object. io.EOF needs to be skipped when
  348. // the rpc is early finished (before the stream object is created.).
  349. // TODO: It is probably better to move this into the generated code.
  350. if !cs.desc.ClientStreams && cs.desc.ServerStreams {
  351. err = nil
  352. }
  353. return
  354. }
  355. if _, ok := err.(transport.ConnectionError); !ok {
  356. cs.closeTransportStream(err)
  357. }
  358. err = toRPCErr(err)
  359. }()
  360. var outPayload *stats.OutPayload
  361. if cs.statsHandler != nil {
  362. outPayload = &stats.OutPayload{
  363. Client: true,
  364. }
  365. }
  366. hdr, data, err := encode(cs.codec, m, cs.cp, outPayload, cs.comp)
  367. if err != nil {
  368. return err
  369. }
  370. if cs.c.maxSendMessageSize == nil {
  371. return status.Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
  372. }
  373. if len(data) > *cs.c.maxSendMessageSize {
  374. return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize)
  375. }
  376. err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: false})
  377. if err == nil && outPayload != nil {
  378. outPayload.SentTime = time.Now()
  379. cs.statsHandler.HandleRPC(cs.statsCtx, outPayload)
  380. }
  381. return err
  382. }
  383. func (cs *clientStream) RecvMsg(m interface{}) (err error) {
  384. var inPayload *stats.InPayload
  385. if cs.statsHandler != nil {
  386. inPayload = &stats.InPayload{
  387. Client: true,
  388. }
  389. }
  390. if cs.c.maxReceiveMessageSize == nil {
  391. return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
  392. }
  393. if !cs.decompSet {
  394. // Block until we receive headers containing received message encoding.
  395. if ct := cs.s.RecvCompress(); ct != "" && ct != encoding.Identity {
  396. if cs.dc == nil || cs.dc.Type() != ct {
  397. // No configured decompressor, or it does not match the incoming
  398. // message encoding; attempt to find a registered compressor that does.
  399. cs.dc = nil
  400. cs.decomp = encoding.GetCompressor(ct)
  401. }
  402. } else {
  403. // No compression is used; disable our decompressor.
  404. cs.dc = nil
  405. }
  406. // Only initialize this state once per stream.
  407. cs.decompSet = true
  408. }
  409. err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload, cs.decomp)
  410. defer func() {
  411. // err != nil indicates the termination of the stream.
  412. if err != nil {
  413. cs.finish(err)
  414. }
  415. }()
  416. if err == nil {
  417. if cs.tracing {
  418. cs.mu.Lock()
  419. if cs.trInfo.tr != nil {
  420. cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
  421. }
  422. cs.mu.Unlock()
  423. }
  424. if inPayload != nil {
  425. cs.statsHandler.HandleRPC(cs.statsCtx, inPayload)
  426. }
  427. if !cs.desc.ClientStreams || cs.desc.ServerStreams {
  428. return
  429. }
  430. // Special handling for client streaming rpc.
  431. // This recv expects EOF or errors, so we don't collect inPayload.
  432. if cs.c.maxReceiveMessageSize == nil {
  433. return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
  434. }
  435. err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp)
  436. cs.closeTransportStream(err)
  437. if err == nil {
  438. return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
  439. }
  440. if err == io.EOF {
  441. if se := cs.s.Status().Err(); se != nil {
  442. return se
  443. }
  444. cs.finish(err)
  445. return nil
  446. }
  447. return toRPCErr(err)
  448. }
  449. if _, ok := err.(transport.ConnectionError); !ok {
  450. cs.closeTransportStream(err)
  451. }
  452. if err == io.EOF {
  453. if statusErr := cs.s.Status().Err(); statusErr != nil {
  454. return statusErr
  455. }
  456. // Returns io.EOF to indicate the end of the stream.
  457. return
  458. }
  459. return toRPCErr(err)
  460. }
  461. func (cs *clientStream) CloseSend() (err error) {
  462. err = cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true})
  463. defer func() {
  464. if err != nil {
  465. cs.finish(err)
  466. }
  467. }()
  468. if err == nil || err == io.EOF {
  469. return nil
  470. }
  471. if _, ok := err.(transport.ConnectionError); !ok {
  472. cs.closeTransportStream(err)
  473. }
  474. err = toRPCErr(err)
  475. return
  476. }
  477. func (cs *clientStream) closeTransportStream(err error) {
  478. cs.mu.Lock()
  479. if cs.closed {
  480. cs.mu.Unlock()
  481. return
  482. }
  483. cs.closed = true
  484. cs.mu.Unlock()
  485. cs.t.CloseStream(cs.s, err)
  486. }
  487. func (cs *clientStream) finish(err error) {
  488. cs.mu.Lock()
  489. defer cs.mu.Unlock()
  490. if cs.finished {
  491. return
  492. }
  493. cs.finished = true
  494. defer func() {
  495. if cs.cancel != nil {
  496. cs.cancel()
  497. }
  498. }()
  499. for _, o := range cs.opts {
  500. o.after(cs.c)
  501. }
  502. if cs.done != nil {
  503. cs.done(balancer.DoneInfo{
  504. Err: err,
  505. BytesSent: true,
  506. BytesReceived: cs.s.BytesReceived(),
  507. })
  508. cs.done = nil
  509. }
  510. if cs.statsHandler != nil {
  511. end := &stats.End{
  512. Client: true,
  513. EndTime: time.Now(),
  514. }
  515. if err != io.EOF {
  516. // end.Error is nil if the RPC finished successfully.
  517. end.Error = toRPCErr(err)
  518. }
  519. cs.statsHandler.HandleRPC(cs.statsCtx, end)
  520. }
  521. if !cs.tracing {
  522. return
  523. }
  524. if cs.trInfo.tr != nil {
  525. if err == nil || err == io.EOF {
  526. cs.trInfo.tr.LazyPrintf("RPC: [OK]")
  527. } else {
  528. cs.trInfo.tr.LazyPrintf("RPC: [%v]", err)
  529. cs.trInfo.tr.SetError()
  530. }
  531. cs.trInfo.tr.Finish()
  532. cs.trInfo.tr = nil
  533. }
  534. }
  535. // ServerStream defines the interface a server stream has to satisfy.
  536. type ServerStream interface {
  537. // SetHeader sets the header metadata. It may be called multiple times.
  538. // When call multiple times, all the provided metadata will be merged.
  539. // All the metadata will be sent out when one of the following happens:
  540. // - ServerStream.SendHeader() is called;
  541. // - The first response is sent out;
  542. // - An RPC status is sent out (error or success).
  543. SetHeader(metadata.MD) error
  544. // SendHeader sends the header metadata.
  545. // The provided md and headers set by SetHeader() will be sent.
  546. // It fails if called multiple times.
  547. SendHeader(metadata.MD) error
  548. // SetTrailer sets the trailer metadata which will be sent with the RPC status.
  549. // When called more than once, all the provided metadata will be merged.
  550. SetTrailer(metadata.MD)
  551. Stream
  552. }
  553. // serverStream implements a server side Stream.
  554. type serverStream struct {
  555. t transport.ServerTransport
  556. s *transport.Stream
  557. p *parser
  558. codec Codec
  559. cp Compressor
  560. dc Decompressor
  561. comp encoding.Compressor
  562. decomp encoding.Compressor
  563. maxReceiveMessageSize int
  564. maxSendMessageSize int
  565. trInfo *traceInfo
  566. statsHandler stats.Handler
  567. mu sync.Mutex // protects trInfo.tr after the service handler runs.
  568. }
  569. func (ss *serverStream) Context() context.Context {
  570. return ss.s.Context()
  571. }
  572. func (ss *serverStream) SetHeader(md metadata.MD) error {
  573. if md.Len() == 0 {
  574. return nil
  575. }
  576. return ss.s.SetHeader(md)
  577. }
  578. func (ss *serverStream) SendHeader(md metadata.MD) error {
  579. return ss.t.WriteHeader(ss.s, md)
  580. }
  581. func (ss *serverStream) SetTrailer(md metadata.MD) {
  582. if md.Len() == 0 {
  583. return
  584. }
  585. ss.s.SetTrailer(md)
  586. return
  587. }
  588. func (ss *serverStream) SendMsg(m interface{}) (err error) {
  589. defer func() {
  590. if ss.trInfo != nil {
  591. ss.mu.Lock()
  592. if ss.trInfo.tr != nil {
  593. if err == nil {
  594. ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
  595. } else {
  596. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  597. ss.trInfo.tr.SetError()
  598. }
  599. }
  600. ss.mu.Unlock()
  601. }
  602. if err != nil && err != io.EOF {
  603. st, _ := status.FromError(toRPCErr(err))
  604. ss.t.WriteStatus(ss.s, st)
  605. }
  606. }()
  607. var outPayload *stats.OutPayload
  608. if ss.statsHandler != nil {
  609. outPayload = &stats.OutPayload{}
  610. }
  611. hdr, data, err := encode(ss.codec, m, ss.cp, outPayload, ss.comp)
  612. if err != nil {
  613. return err
  614. }
  615. if len(data) > ss.maxSendMessageSize {
  616. return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize)
  617. }
  618. if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil {
  619. return toRPCErr(err)
  620. }
  621. if outPayload != nil {
  622. outPayload.SentTime = time.Now()
  623. ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
  624. }
  625. return nil
  626. }
  627. func (ss *serverStream) RecvMsg(m interface{}) (err error) {
  628. defer func() {
  629. if ss.trInfo != nil {
  630. ss.mu.Lock()
  631. if ss.trInfo.tr != nil {
  632. if err == nil {
  633. ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
  634. } else if err != io.EOF {
  635. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  636. ss.trInfo.tr.SetError()
  637. }
  638. }
  639. ss.mu.Unlock()
  640. }
  641. if err != nil && err != io.EOF {
  642. st, _ := status.FromError(toRPCErr(err))
  643. ss.t.WriteStatus(ss.s, st)
  644. }
  645. }()
  646. var inPayload *stats.InPayload
  647. if ss.statsHandler != nil {
  648. inPayload = &stats.InPayload{}
  649. }
  650. if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil {
  651. if err == io.EOF {
  652. return err
  653. }
  654. if err == io.ErrUnexpectedEOF {
  655. err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  656. }
  657. return toRPCErr(err)
  658. }
  659. if inPayload != nil {
  660. ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
  661. }
  662. return nil
  663. }
  664. // MethodFromServerStream returns the method string for the input stream.
  665. // The returned string is in the format of "/service/method".
  666. func MethodFromServerStream(stream ServerStream) (string, bool) {
  667. s, ok := transport.StreamFromContext(stream.Context())
  668. if !ok {
  669. return "", ok
  670. }
  671. return s.Method(), ok
  672. }