1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "errors" 23 "io" 24 "sync" 25 "time" 26 27 "golang.org/x/net/context" 28 "golang.org/x/net/trace" 29 "google.golang.org/grpc/balancer" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/encoding" 32 "google.golang.org/grpc/internal/channelz" 33 "google.golang.org/grpc/metadata" 34 "google.golang.org/grpc/stats" 35 "google.golang.org/grpc/status" 36 "google.golang.org/grpc/transport" 37) 38 39// StreamHandler defines the handler called by gRPC server to complete the 40// execution of a streaming RPC. If a StreamHandler returns an error, it 41// should be produced by the status package, or else gRPC will use 42// codes.Unknown as the status code and err.Error() as the status message 43// of the RPC. 44type StreamHandler func(srv interface{}, stream ServerStream) error 45 46// StreamDesc represents a streaming RPC service's method specification. 47type StreamDesc struct { 48 StreamName string 49 Handler StreamHandler 50 51 // At least one of these is true. 52 ServerStreams bool 53 ClientStreams bool 54} 55 56// Stream defines the common interface a client or server stream has to satisfy. 57// 58// All errors returned from Stream are compatible with the status package. 59type Stream interface { 60 // Context returns the context for this stream. 61 Context() context.Context 62 // SendMsg blocks until it sends m, the stream is done or the stream 63 // breaks. 64 // On error, it aborts the stream and returns an RPC status on client 65 // side. On server side, it simply returns the error to the caller. 66 // SendMsg is called by generated code. Also Users can call SendMsg 67 // directly when it is really needed in their use cases. 68 // It's safe to have a goroutine calling SendMsg and another goroutine calling 69 // recvMsg on the same stream at the same time. 70 // But it is not safe to call SendMsg on the same stream in different goroutines. 71 SendMsg(m interface{}) error 72 // RecvMsg blocks until it receives a message or the stream is 73 // done. On client side, it returns io.EOF when the stream is done. On 74 // any other error, it aborts the stream and returns an RPC status. On 75 // server side, it simply returns the error to the caller. 76 // It's safe to have a goroutine calling SendMsg and another goroutine calling 77 // recvMsg on the same stream at the same time. 78 // But it is not safe to call RecvMsg on the same stream in different goroutines. 79 RecvMsg(m interface{}) error 80} 81 82// ClientStream defines the interface a client stream has to satisfy. 83type ClientStream interface { 84 // Header returns the header metadata received from the server if there 85 // is any. It blocks if the metadata is not ready to read. 86 Header() (metadata.MD, error) 87 // Trailer returns the trailer metadata from the server, if there is any. 88 // It must only be called after stream.CloseAndRecv has returned, or 89 // stream.Recv has returned a non-nil error (including io.EOF). 90 Trailer() metadata.MD 91 // CloseSend closes the send direction of the stream. It closes the stream 92 // when non-nil error is met. 93 CloseSend() error 94 // Stream.SendMsg() may return a non-nil error when something wrong happens sending 95 // the request. The returned error indicates the status of this sending, not the final 96 // status of the RPC. 97 // 98 // Always call Stream.RecvMsg() to drain the stream and get the final 99 // status, otherwise there could be leaked resources. 100 Stream 101} 102 103// NewStream creates a new Stream for the client side. This is typically 104// called by generated code. ctx is used for the lifetime of the stream. 105// 106// To ensure resources are not leaked due to the stream returned, one of the following 107// actions must be performed: 108// 109// 1. Call Close on the ClientConn. 110// 2. Cancel the context provided. 111// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated 112// client-streaming RPC, for instance, might use the helper function 113// CloseAndRecv (note that CloseSend does not Recv, therefore is not 114// guaranteed to release all resources). 115// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. 116// 117// If none of the above happen, a goroutine and a context will be leaked, and grpc 118// will not call the optionally-configured stats handler with a stats.End message. 119func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { 120 // allow interceptor to see all applicable call options, which means those 121 // configured as defaults from dial option as well as per-call options 122 opts = combine(cc.dopts.callOptions, opts) 123 124 if cc.dopts.streamInt != nil { 125 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) 126 } 127 return newClientStream(ctx, desc, cc, method, opts...) 128} 129 130// NewClientStream is a wrapper for ClientConn.NewStream. 131// 132// DEPRECATED: Use ClientConn.NewStream instead. 133func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 134 return cc.NewStream(ctx, desc, method, opts...) 135} 136 137func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { 138 if channelz.IsOn() { 139 cc.incrCallsStarted() 140 defer func() { 141 if err != nil { 142 cc.incrCallsFailed() 143 } 144 }() 145 } 146 c := defaultCallInfo() 147 mc := cc.GetMethodConfig(method) 148 if mc.WaitForReady != nil { 149 c.failFast = !*mc.WaitForReady 150 } 151 152 // Possible context leak: 153 // The cancel function for the child context we create will only be called 154 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 155 // an error is generated by SendMsg. 156 // https://github.com/grpc/grpc-go/issues/1818. 157 var cancel context.CancelFunc 158 if mc.Timeout != nil && *mc.Timeout >= 0 { 159 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) 160 } else { 161 ctx, cancel = context.WithCancel(ctx) 162 } 163 defer func() { 164 if err != nil { 165 cancel() 166 } 167 }() 168 169 for _, o := range opts { 170 if err := o.before(c); err != nil { 171 return nil, toRPCErr(err) 172 } 173 } 174 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) 175 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 176 if err := setCallInfoCodec(c); err != nil { 177 return nil, err 178 } 179 180 callHdr := &transport.CallHdr{ 181 Host: cc.authority, 182 Method: method, 183 // If it's not client streaming, we should already have the request to be sent, 184 // so we don't flush the header. 185 // If it's client streaming, the user may never send a request or send it any 186 // time soon, so we ask the transport to flush the header. 187 Flush: desc.ClientStreams, 188 ContentSubtype: c.contentSubtype, 189 } 190 191 // Set our outgoing compression according to the UseCompressor CallOption, if 192 // set. In that case, also find the compressor from the encoding package. 193 // Otherwise, use the compressor configured by the WithCompressor DialOption, 194 // if set. 195 var cp Compressor 196 var comp encoding.Compressor 197 if ct := c.compressorType; ct != "" { 198 callHdr.SendCompress = ct 199 if ct != encoding.Identity { 200 comp = encoding.GetCompressor(ct) 201 if comp == nil { 202 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 203 } 204 } 205 } else if cc.dopts.cp != nil { 206 callHdr.SendCompress = cc.dopts.cp.Type() 207 cp = cc.dopts.cp 208 } 209 if c.creds != nil { 210 callHdr.Creds = c.creds 211 } 212 var trInfo traceInfo 213 if EnableTracing { 214 trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) 215 trInfo.firstLine.client = true 216 if deadline, ok := ctx.Deadline(); ok { 217 trInfo.firstLine.deadline = deadline.Sub(time.Now()) 218 } 219 trInfo.tr.LazyLog(&trInfo.firstLine, false) 220 ctx = trace.NewContext(ctx, trInfo.tr) 221 defer func() { 222 if err != nil { 223 // Need to call tr.finish() if error is returned. 224 // Because tr will not be returned to caller. 225 trInfo.tr.LazyPrintf("RPC: [%v]", err) 226 trInfo.tr.SetError() 227 trInfo.tr.Finish() 228 } 229 }() 230 } 231 ctx = newContextWithRPCInfo(ctx, c.failFast) 232 sh := cc.dopts.copts.StatsHandler 233 var beginTime time.Time 234 if sh != nil { 235 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) 236 beginTime = time.Now() 237 begin := &stats.Begin{ 238 Client: true, 239 BeginTime: beginTime, 240 FailFast: c.failFast, 241 } 242 sh.HandleRPC(ctx, begin) 243 defer func() { 244 if err != nil { 245 // Only handle end stats if err != nil. 246 end := &stats.End{ 247 Client: true, 248 Error: err, 249 BeginTime: beginTime, 250 EndTime: time.Now(), 251 } 252 sh.HandleRPC(ctx, end) 253 } 254 }() 255 } 256 257 var ( 258 t transport.ClientTransport 259 s *transport.Stream 260 done func(balancer.DoneInfo) 261 ) 262 for { 263 // Check to make sure the context has expired. This will prevent us from 264 // looping forever if an error occurs for wait-for-ready RPCs where no data 265 // is sent on the wire. 266 select { 267 case <-ctx.Done(): 268 return nil, toRPCErr(ctx.Err()) 269 default: 270 } 271 272 t, done, err = cc.getTransport(ctx, c.failFast) 273 if err != nil { 274 return nil, err 275 } 276 277 s, err = t.NewStream(ctx, callHdr) 278 if err != nil { 279 if done != nil { 280 done(balancer.DoneInfo{Err: err}) 281 done = nil 282 } 283 // In the event of any error from NewStream, we never attempted to write 284 // anything to the wire, so we can retry indefinitely for non-fail-fast 285 // RPCs. 286 if !c.failFast { 287 continue 288 } 289 return nil, toRPCErr(err) 290 } 291 break 292 } 293 294 cs := &clientStream{ 295 opts: opts, 296 c: c, 297 cc: cc, 298 desc: desc, 299 codec: c.codec, 300 cp: cp, 301 comp: comp, 302 cancel: cancel, 303 attempt: &csAttempt{ 304 t: t, 305 s: s, 306 p: &parser{r: s}, 307 done: done, 308 dc: cc.dopts.dc, 309 ctx: ctx, 310 trInfo: trInfo, 311 statsHandler: sh, 312 beginTime: beginTime, 313 }, 314 } 315 cs.c.stream = cs 316 cs.attempt.cs = cs 317 if desc != unaryStreamDesc { 318 // Listen on cc and stream contexts to cleanup when the user closes the 319 // ClientConn or cancels the stream context. In all other cases, an error 320 // should already be injected into the recv buffer by the transport, which 321 // the client will eventually receive, and then we will cancel the stream's 322 // context in clientStream.finish. 323 go func() { 324 select { 325 case <-cc.ctx.Done(): 326 cs.finish(ErrClientConnClosing) 327 case <-ctx.Done(): 328 cs.finish(toRPCErr(ctx.Err())) 329 } 330 }() 331 } 332 return cs, nil 333} 334 335// clientStream implements a client side Stream. 336type clientStream struct { 337 opts []CallOption 338 c *callInfo 339 cc *ClientConn 340 desc *StreamDesc 341 342 codec baseCodec 343 cp Compressor 344 comp encoding.Compressor 345 346 cancel context.CancelFunc // cancels all attempts 347 348 sentLast bool // sent an end stream 349 350 mu sync.Mutex // guards finished 351 finished bool // TODO: replace with atomic cmpxchg or sync.Once? 352 353 attempt *csAttempt // the active client stream attempt 354 // TODO(hedging): hedging will have multiple attempts simultaneously. 355} 356 357// csAttempt implements a single transport stream attempt within a 358// clientStream. 359type csAttempt struct { 360 cs *clientStream 361 t transport.ClientTransport 362 s *transport.Stream 363 p *parser 364 done func(balancer.DoneInfo) 365 366 dc Decompressor 367 decomp encoding.Compressor 368 decompSet bool 369 370 ctx context.Context // the application's context, wrapped by stats/tracing 371 372 mu sync.Mutex // guards trInfo.tr 373 // trInfo.tr is set when created (if EnableTracing is true), 374 // and cleared when the finish method is called. 375 trInfo traceInfo 376 377 statsHandler stats.Handler 378 beginTime time.Time 379} 380 381func (cs *clientStream) Context() context.Context { 382 // TODO(retry): commit the current attempt (the context has peer-aware data). 383 return cs.attempt.context() 384} 385 386func (cs *clientStream) Header() (metadata.MD, error) { 387 m, err := cs.attempt.header() 388 if err != nil { 389 // TODO(retry): maybe retry on error or commit attempt on success. 390 err = toRPCErr(err) 391 cs.finish(err) 392 } 393 return m, err 394} 395 396func (cs *clientStream) Trailer() metadata.MD { 397 // TODO(retry): on error, maybe retry (trailers-only). 398 return cs.attempt.trailer() 399} 400 401func (cs *clientStream) SendMsg(m interface{}) (err error) { 402 // TODO(retry): buffer message for replaying if not committed. 403 return cs.attempt.sendMsg(m) 404} 405 406func (cs *clientStream) RecvMsg(m interface{}) (err error) { 407 // TODO(retry): maybe retry on error or commit attempt on success. 408 return cs.attempt.recvMsg(m) 409} 410 411func (cs *clientStream) CloseSend() error { 412 cs.attempt.closeSend() 413 return nil 414} 415 416func (cs *clientStream) finish(err error) { 417 if err == io.EOF { 418 // Ending a stream with EOF indicates a success. 419 err = nil 420 } 421 cs.mu.Lock() 422 if cs.finished { 423 cs.mu.Unlock() 424 return 425 } 426 cs.finished = true 427 cs.mu.Unlock() 428 if channelz.IsOn() { 429 if err != nil { 430 cs.cc.incrCallsFailed() 431 } else { 432 cs.cc.incrCallsSucceeded() 433 } 434 } 435 // TODO(retry): commit current attempt if necessary. 436 cs.attempt.finish(err) 437 for _, o := range cs.opts { 438 o.after(cs.c) 439 } 440 cs.cancel() 441} 442 443func (a *csAttempt) context() context.Context { 444 return a.s.Context() 445} 446 447func (a *csAttempt) header() (metadata.MD, error) { 448 return a.s.Header() 449} 450 451func (a *csAttempt) trailer() metadata.MD { 452 return a.s.Trailer() 453} 454 455func (a *csAttempt) sendMsg(m interface{}) (err error) { 456 // TODO Investigate how to signal the stats handling party. 457 // generate error stats if err != nil && err != io.EOF? 458 cs := a.cs 459 defer func() { 460 // For non-client-streaming RPCs, we return nil instead of EOF on success 461 // because the generated code requires it. finish is not called; RecvMsg() 462 // will call it with the stream's status independently. 463 if err == io.EOF && !cs.desc.ClientStreams { 464 err = nil 465 } 466 if err != nil && err != io.EOF { 467 // Call finish on the client stream for errors generated by this SendMsg 468 // call, as these indicate problems created by this client. (Transport 469 // errors are converted to an io.EOF error below; the real error will be 470 // returned from RecvMsg eventually in that case, or be retried.) 471 cs.finish(err) 472 } 473 }() 474 // TODO: Check cs.sentLast and error if we already ended the stream. 475 if EnableTracing { 476 a.mu.Lock() 477 if a.trInfo.tr != nil { 478 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 479 } 480 a.mu.Unlock() 481 } 482 data, err := encode(cs.codec, m) 483 if err != nil { 484 return err 485 } 486 compData, err := compress(data, cs.cp, cs.comp) 487 if err != nil { 488 return err 489 } 490 hdr, payload := msgHeader(data, compData) 491 // TODO(dfawley): should we be checking len(data) instead? 492 if len(payload) > *cs.c.maxSendMessageSize { 493 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.c.maxSendMessageSize) 494 } 495 496 if !cs.desc.ClientStreams { 497 cs.sentLast = true 498 } 499 err = a.t.Write(a.s, hdr, payload, &transport.Options{Last: !cs.desc.ClientStreams}) 500 if err == nil { 501 if a.statsHandler != nil { 502 a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payload, time.Now())) 503 } 504 if channelz.IsOn() { 505 a.t.IncrMsgSent() 506 } 507 return nil 508 } 509 return io.EOF 510} 511 512func (a *csAttempt) recvMsg(m interface{}) (err error) { 513 cs := a.cs 514 defer func() { 515 if err != nil || !cs.desc.ServerStreams { 516 // err != nil or non-server-streaming indicates end of stream. 517 cs.finish(err) 518 } 519 }() 520 var inPayload *stats.InPayload 521 if a.statsHandler != nil { 522 inPayload = &stats.InPayload{ 523 Client: true, 524 } 525 } 526 if !a.decompSet { 527 // Block until we receive headers containing received message encoding. 528 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { 529 if a.dc == nil || a.dc.Type() != ct { 530 // No configured decompressor, or it does not match the incoming 531 // message encoding; attempt to find a registered compressor that does. 532 a.dc = nil 533 a.decomp = encoding.GetCompressor(ct) 534 } 535 } else { 536 // No compression is used; disable our decompressor. 537 a.dc = nil 538 } 539 // Only initialize this state once per stream. 540 a.decompSet = true 541 } 542 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp) 543 if err != nil { 544 if err == io.EOF { 545 if statusErr := a.s.Status().Err(); statusErr != nil { 546 return statusErr 547 } 548 return io.EOF // indicates successful end of stream. 549 } 550 return toRPCErr(err) 551 } 552 if EnableTracing { 553 a.mu.Lock() 554 if a.trInfo.tr != nil { 555 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 556 } 557 a.mu.Unlock() 558 } 559 if inPayload != nil { 560 a.statsHandler.HandleRPC(a.ctx, inPayload) 561 } 562 if channelz.IsOn() { 563 a.t.IncrMsgRecv() 564 } 565 if cs.desc.ServerStreams { 566 // Subsequent messages should be received by subsequent RecvMsg calls. 567 return nil 568 } 569 570 // Special handling for non-server-stream rpcs. 571 // This recv expects EOF or errors, so we don't collect inPayload. 572 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp) 573 if err == nil { 574 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 575 } 576 if err == io.EOF { 577 return a.s.Status().Err() // non-server streaming Recv returns nil on success 578 } 579 return toRPCErr(err) 580} 581 582func (a *csAttempt) closeSend() { 583 cs := a.cs 584 if cs.sentLast { 585 return 586 } 587 cs.sentLast = true 588 cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true}) 589 // We ignore errors from Write. Any error it would return would also be 590 // returned by a subsequent RecvMsg call, and the user is supposed to always 591 // finish the stream by calling RecvMsg until it returns err != nil. 592} 593 594func (a *csAttempt) finish(err error) { 595 a.mu.Lock() 596 a.t.CloseStream(a.s, err) 597 598 if a.done != nil { 599 a.done(balancer.DoneInfo{ 600 Err: err, 601 BytesSent: true, 602 BytesReceived: a.s.BytesReceived(), 603 }) 604 } 605 if a.statsHandler != nil { 606 end := &stats.End{ 607 Client: true, 608 BeginTime: a.beginTime, 609 EndTime: time.Now(), 610 Error: err, 611 } 612 a.statsHandler.HandleRPC(a.ctx, end) 613 } 614 if a.trInfo.tr != nil { 615 if err == nil { 616 a.trInfo.tr.LazyPrintf("RPC: [OK]") 617 } else { 618 a.trInfo.tr.LazyPrintf("RPC: [%v]", err) 619 a.trInfo.tr.SetError() 620 } 621 a.trInfo.tr.Finish() 622 a.trInfo.tr = nil 623 } 624 a.mu.Unlock() 625} 626 627// ServerStream defines the interface a server stream has to satisfy. 628type ServerStream interface { 629 // SetHeader sets the header metadata. It may be called multiple times. 630 // When call multiple times, all the provided metadata will be merged. 631 // All the metadata will be sent out when one of the following happens: 632 // - ServerStream.SendHeader() is called; 633 // - The first response is sent out; 634 // - An RPC status is sent out (error or success). 635 SetHeader(metadata.MD) error 636 // SendHeader sends the header metadata. 637 // The provided md and headers set by SetHeader() will be sent. 638 // It fails if called multiple times. 639 SendHeader(metadata.MD) error 640 // SetTrailer sets the trailer metadata which will be sent with the RPC status. 641 // When called more than once, all the provided metadata will be merged. 642 SetTrailer(metadata.MD) 643 Stream 644} 645 646// serverStream implements a server side Stream. 647type serverStream struct { 648 ctx context.Context 649 t transport.ServerTransport 650 s *transport.Stream 651 p *parser 652 codec baseCodec 653 654 cp Compressor 655 dc Decompressor 656 comp encoding.Compressor 657 decomp encoding.Compressor 658 659 maxReceiveMessageSize int 660 maxSendMessageSize int 661 trInfo *traceInfo 662 663 statsHandler stats.Handler 664 665 mu sync.Mutex // protects trInfo.tr after the service handler runs. 666} 667 668func (ss *serverStream) Context() context.Context { 669 return ss.ctx 670} 671 672func (ss *serverStream) SetHeader(md metadata.MD) error { 673 if md.Len() == 0 { 674 return nil 675 } 676 return ss.s.SetHeader(md) 677} 678 679func (ss *serverStream) SendHeader(md metadata.MD) error { 680 return ss.t.WriteHeader(ss.s, md) 681} 682 683func (ss *serverStream) SetTrailer(md metadata.MD) { 684 if md.Len() == 0 { 685 return 686 } 687 ss.s.SetTrailer(md) 688} 689 690func (ss *serverStream) SendMsg(m interface{}) (err error) { 691 defer func() { 692 if ss.trInfo != nil { 693 ss.mu.Lock() 694 if ss.trInfo.tr != nil { 695 if err == nil { 696 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 697 } else { 698 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 699 ss.trInfo.tr.SetError() 700 } 701 } 702 ss.mu.Unlock() 703 } 704 if err != nil && err != io.EOF { 705 st, _ := status.FromError(toRPCErr(err)) 706 ss.t.WriteStatus(ss.s, st) 707 } 708 if channelz.IsOn() && err == nil { 709 ss.t.IncrMsgSent() 710 } 711 }() 712 data, err := encode(ss.codec, m) 713 if err != nil { 714 return err 715 } 716 compData, err := compress(data, ss.cp, ss.comp) 717 if err != nil { 718 return err 719 } 720 hdr, payload := msgHeader(data, compData) 721 // TODO(dfawley): should we be checking len(data) instead? 722 if len(payload) > ss.maxSendMessageSize { 723 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) 724 } 725 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { 726 return toRPCErr(err) 727 } 728 if ss.statsHandler != nil { 729 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) 730 } 731 return nil 732} 733 734func (ss *serverStream) RecvMsg(m interface{}) (err error) { 735 defer func() { 736 if ss.trInfo != nil { 737 ss.mu.Lock() 738 if ss.trInfo.tr != nil { 739 if err == nil { 740 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 741 } else if err != io.EOF { 742 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 743 ss.trInfo.tr.SetError() 744 } 745 } 746 ss.mu.Unlock() 747 } 748 if err != nil && err != io.EOF { 749 st, _ := status.FromError(toRPCErr(err)) 750 ss.t.WriteStatus(ss.s, st) 751 } 752 if channelz.IsOn() && err == nil { 753 ss.t.IncrMsgRecv() 754 } 755 }() 756 var inPayload *stats.InPayload 757 if ss.statsHandler != nil { 758 inPayload = &stats.InPayload{} 759 } 760 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil { 761 if err == io.EOF { 762 return err 763 } 764 if err == io.ErrUnexpectedEOF { 765 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 766 } 767 return toRPCErr(err) 768 } 769 if inPayload != nil { 770 ss.statsHandler.HandleRPC(ss.s.Context(), inPayload) 771 } 772 return nil 773} 774 775// MethodFromServerStream returns the method string for the input stream. 776// The returned string is in the format of "/service/method". 777func MethodFromServerStream(stream ServerStream) (string, bool) { 778 return Method(stream.Context()) 779} 780