1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 pub mod client;
4 pub mod server;
5 
6 use std::fmt::{self, Debug, Display};
7 use std::pin::Pin;
8 use std::sync::Arc;
9 use std::{ptr, slice};
10 
11 use crate::cq::CompletionQueue;
12 use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16 use libc::c_void;
17 use parking_lot::Mutex;
18 
19 use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice};
20 use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
21 use crate::error::{Error, Result};
22 use crate::grpc_sys::grpc_status_code::*;
23 use crate::task::{self, BatchFuture, BatchType, CallTag};
24 
25 /// An gRPC status code structure.
26 /// This type contains constants for all gRPC status codes.
27 #[derive(PartialEq, Eq, Clone, Copy)]
28 pub struct RpcStatusCode(i32);
29 
30 impl From<i32> for RpcStatusCode {
from(code: i32) -> RpcStatusCode31     fn from(code: i32) -> RpcStatusCode {
32         RpcStatusCode(code)
33     }
34 }
35 
36 impl From<RpcStatusCode> for i32 {
from(code: RpcStatusCode) -> i3237     fn from(code: RpcStatusCode) -> i32 {
38         code.0
39     }
40 }
41 
42 impl Display for RpcStatusCode {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result43     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44         Debug::fmt(self, f)
45     }
46 }
47 
48 macro_rules! status_codes {
49     (
50         $(
51             ($num:path, $konst:ident);
52         )+
53     ) => {
54         impl RpcStatusCode {
55         $(
56             pub const $konst: RpcStatusCode = RpcStatusCode($num);
57         )+
58         }
59 
60         impl Debug for RpcStatusCode {
61             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62                 write!(
63                     f,
64                     "{}-{}",
65                     self.0,
66                     match self {
67                         $(RpcStatusCode($num) => stringify!($konst),)+
68                         RpcStatusCode(_) => "INVALID_STATUS_CODE",
69                     }
70                 )
71             }
72         }
73     }
74 }
75 
76 status_codes! {
77     (GRPC_STATUS_OK, OK);
78     (GRPC_STATUS_CANCELLED, CANCELLED);
79     (GRPC_STATUS_UNKNOWN, UNKNOWN);
80     (GRPC_STATUS_INVALID_ARGUMENT, INVALID_ARGUMENT);
81     (GRPC_STATUS_DEADLINE_EXCEEDED, DEADLINE_EXCEEDED);
82     (GRPC_STATUS_NOT_FOUND, NOT_FOUND);
83     (GRPC_STATUS_ALREADY_EXISTS, ALREADY_EXISTS);
84     (GRPC_STATUS_PERMISSION_DENIED, PERMISSION_DENIED);
85     (GRPC_STATUS_RESOURCE_EXHAUSTED, RESOURCE_EXHAUSTED);
86     (GRPC_STATUS_FAILED_PRECONDITION, FAILED_PRECONDITION);
87     (GRPC_STATUS_ABORTED, ABORTED);
88     (GRPC_STATUS_OUT_OF_RANGE, OUT_OF_RANGE);
89     (GRPC_STATUS_UNIMPLEMENTED, UNIMPLEMENTED);
90     (GRPC_STATUS_INTERNAL, INTERNAL);
91     (GRPC_STATUS_UNAVAILABLE, UNAVAILABLE);
92     (GRPC_STATUS_DATA_LOSS, DATA_LOSS);
93     (GRPC_STATUS_UNAUTHENTICATED, UNAUTHENTICATED);
94     (GRPC_STATUS__DO_NOT_USE, DO_NOT_USE);
95 }
96 
97 /// Method types supported by gRPC.
98 #[derive(Clone, Copy)]
99 pub enum MethodType {
100     /// Single request sent from client, single response received from server.
101     Unary,
102 
103     /// Stream of requests sent from client, single response received from server.
104     ClientStreaming,
105 
106     /// Single request sent from client, stream of responses received from server.
107     ServerStreaming,
108 
109     /// Both server and client can stream arbitrary number of requests and responses simultaneously.
110     Duplex,
111 }
112 
113 /// A description of a remote method.
114 // TODO: add serializer and deserializer.
115 pub struct Method<Req, Resp> {
116     /// Type of method.
117     pub ty: MethodType,
118 
119     /// Full qualified name of the method.
120     pub name: &'static str,
121 
122     /// The marshaller used for request messages.
123     pub req_mar: Marshaller<Req>,
124 
125     /// The marshaller used for response messages.
126     pub resp_mar: Marshaller<Resp>,
127 }
128 
129 impl<Req, Resp> Method<Req, Resp> {
130     /// Get the request serializer.
131     #[inline]
req_ser(&self) -> SerializeFn<Req>132     pub fn req_ser(&self) -> SerializeFn<Req> {
133         self.req_mar.ser
134     }
135 
136     /// Get the request deserializer.
137     #[inline]
req_de(&self) -> DeserializeFn<Req>138     pub fn req_de(&self) -> DeserializeFn<Req> {
139         self.req_mar.de
140     }
141 
142     /// Get the response serializer.
143     #[inline]
resp_ser(&self) -> SerializeFn<Resp>144     pub fn resp_ser(&self) -> SerializeFn<Resp> {
145         self.resp_mar.ser
146     }
147 
148     /// Get the response deserializer.
149     #[inline]
resp_de(&self) -> DeserializeFn<Resp>150     pub fn resp_de(&self) -> DeserializeFn<Resp> {
151         self.resp_mar.de
152     }
153 }
154 
155 /// RPC result returned from the server.
156 #[derive(Debug, Clone)]
157 pub struct RpcStatus {
158     /// gRPC status code. `Ok` indicates success, all other values indicate an error.
159     pub status: RpcStatusCode,
160 
161     /// Optional detail string.
162     pub details: Option<String>,
163 }
164 
165 impl Display for RpcStatus {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result166     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
167         Debug::fmt(self, fmt)
168     }
169 }
170 
171 impl RpcStatus {
172     /// Create a new [`RpcStatus`].
new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus173     pub fn new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus {
174         RpcStatus {
175             status: code.into(),
176             details,
177         }
178     }
179 
180     /// Create a new [`RpcStatus`] that status code is Ok.
ok() -> RpcStatus181     pub fn ok() -> RpcStatus {
182         RpcStatus::new(RpcStatusCode::OK, None)
183     }
184 }
185 
186 pub type MessageReader = GrpcByteBufferReader;
187 
188 /// Context for batch request.
189 pub struct BatchContext {
190     ctx: *mut grpcwrap_batch_context,
191 }
192 
193 impl BatchContext {
new() -> BatchContext194     pub fn new() -> BatchContext {
195         BatchContext {
196             ctx: unsafe { grpc_sys::grpcwrap_batch_context_create() },
197         }
198     }
199 
as_ptr(&self) -> *mut grpcwrap_batch_context200     pub fn as_ptr(&self) -> *mut grpcwrap_batch_context {
201         self.ctx
202     }
203 
take_recv_message(&self) -> Option<GrpcByteBuffer>204     pub fn take_recv_message(&self) -> Option<GrpcByteBuffer> {
205         let ptr = unsafe { grpc_sys::grpcwrap_batch_context_take_recv_message(self.ctx) };
206         if ptr.is_null() {
207             None
208         } else {
209             Some(unsafe { GrpcByteBuffer::from_raw(ptr) })
210         }
211     }
212 
213     /// Get the status of the rpc call.
rpc_status(&self) -> RpcStatus214     pub fn rpc_status(&self) -> RpcStatus {
215         let status = RpcStatusCode(unsafe {
216             grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx)
217         });
218 
219         let details = if status == RpcStatusCode::OK {
220             None
221         } else {
222             unsafe {
223                 let mut details_len = 0;
224                 let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details(
225                     self.ctx,
226                     &mut details_len,
227                 );
228                 let details_slice = slice::from_raw_parts(details_ptr as *const _, details_len);
229                 Some(String::from_utf8_lossy(details_slice).into_owned())
230             }
231         };
232 
233         RpcStatus::new(status, details)
234     }
235 
236     /// Fetch the response bytes of the rpc call.
recv_message(&mut self) -> Option<MessageReader>237     pub fn recv_message(&mut self) -> Option<MessageReader> {
238         let buf = self.take_recv_message()?;
239         Some(GrpcByteBufferReader::new(buf))
240     }
241 }
242 
243 impl Drop for BatchContext {
drop(&mut self)244     fn drop(&mut self) {
245         unsafe { grpc_sys::grpcwrap_batch_context_destroy(self.ctx) }
246     }
247 }
248 
249 #[inline]
box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void)250 fn box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void) {
251     let tag_box = Box::new(tag);
252     (
253         tag_box.batch_ctx().unwrap().as_ptr(),
254         Box::into_raw(tag_box) as _,
255     )
256 }
257 
258 /// A helper function that runs the batch call and checks the result.
check_run<F>(bt: BatchType, f: F) -> BatchFuture where F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,259 fn check_run<F>(bt: BatchType, f: F) -> BatchFuture
260 where
261     F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,
262 {
263     let (cq_f, tag) = CallTag::batch_pair(bt);
264     let (batch_ptr, tag_ptr) = box_batch_tag(tag);
265     let code = f(batch_ptr, tag_ptr);
266     if code != grpc_call_error::GRPC_CALL_OK {
267         unsafe {
268             Box::from_raw(tag_ptr);
269         }
270         panic!("create call fail: {:?}", code);
271     }
272     cq_f
273 }
274 
275 /// A Call represents an RPC.
276 ///
277 /// When created, it is in a configuration state allowing properties to be
278 /// set until it is invoked. After invoke, the Call can have messages
279 /// written to it and read from it.
280 pub struct Call {
281     pub call: *mut grpc_call,
282     pub cq: CompletionQueue,
283 }
284 
285 unsafe impl Send for Call {}
286 
287 impl Call {
from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call288     pub unsafe fn from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call {
289         assert!(!call.is_null());
290         Call { call, cq }
291     }
292 
293     /// Send a message asynchronously.
start_send_message( &mut self, msg: &mut GrpcSlice, write_flags: u32, initial_meta: bool, ) -> Result<BatchFuture>294     pub fn start_send_message(
295         &mut self,
296         msg: &mut GrpcSlice,
297         write_flags: u32,
298         initial_meta: bool,
299     ) -> Result<BatchFuture> {
300         let _cq_ref = self.cq.borrow()?;
301         let i = if initial_meta { 1 } else { 0 };
302         let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
303             grpc_sys::grpcwrap_call_send_message(
304                 self.call,
305                 ctx,
306                 msg.as_mut_ptr(),
307                 write_flags,
308                 i,
309                 tag,
310             )
311         });
312         Ok(f)
313     }
314 
315     /// Finish the rpc call from client.
start_send_close_client(&mut self) -> Result<BatchFuture>316     pub fn start_send_close_client(&mut self) -> Result<BatchFuture> {
317         let _cq_ref = self.cq.borrow()?;
318         let f = check_run(BatchType::Finish, |_, tag| unsafe {
319             grpc_sys::grpcwrap_call_send_close_from_client(self.call, tag)
320         });
321         Ok(f)
322     }
323 
324     /// Receive a message asynchronously.
start_recv_message(&mut self) -> Result<BatchFuture>325     pub fn start_recv_message(&mut self) -> Result<BatchFuture> {
326         let _cq_ref = self.cq.borrow()?;
327         let f = check_run(BatchType::Read, |ctx, tag| unsafe {
328             grpc_sys::grpcwrap_call_recv_message(self.call, ctx, tag)
329         });
330         Ok(f)
331     }
332 
333     /// Start handling from server side.
334     ///
335     /// Future will finish once close is received by the server.
start_server_side(&mut self) -> Result<BatchFuture>336     pub fn start_server_side(&mut self) -> Result<BatchFuture> {
337         let _cq_ref = self.cq.borrow()?;
338         let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
339             grpc_sys::grpcwrap_call_start_serverside(self.call, ctx, tag)
340         });
341         Ok(f)
342     }
343 
344     /// Send a status from server.
start_send_status_from_server( &mut self, status: &RpcStatus, send_empty_metadata: bool, payload: &mut Option<GrpcSlice>, write_flags: u32, ) -> Result<BatchFuture>345     pub fn start_send_status_from_server(
346         &mut self,
347         status: &RpcStatus,
348         send_empty_metadata: bool,
349         payload: &mut Option<GrpcSlice>,
350         write_flags: u32,
351     ) -> Result<BatchFuture> {
352         let _cq_ref = self.cq.borrow()?;
353         let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
354         let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
355             let details_ptr = status
356                 .details
357                 .as_ref()
358                 .map_or_else(ptr::null, |s| s.as_ptr() as _);
359             let details_len = status.details.as_ref().map_or(0, String::len);
360             let payload_p = match payload {
361                 Some(p) => p.as_mut_ptr(),
362                 None => ptr::null_mut(),
363             };
364             grpc_sys::grpcwrap_call_send_status_from_server(
365                 self.call,
366                 ctx,
367                 status.status.into(),
368                 details_ptr,
369                 details_len,
370                 ptr::null_mut(),
371                 send_empty_metadata,
372                 payload_p,
373                 write_flags,
374                 tag,
375             )
376         });
377         Ok(f)
378     }
379 
380     /// Abort an rpc call before handler is called.
abort(self, status: &RpcStatus)381     pub fn abort(self, status: &RpcStatus) {
382         match self.cq.borrow() {
383             // Queue is shutdown, ignore.
384             Err(Error::QueueShutdown) => return,
385             Err(e) => panic!("unexpected error when aborting call: {:?}", e),
386             _ => {}
387         }
388         let call_ptr = self.call;
389         let tag = CallTag::abort(self);
390         let (batch_ptr, tag_ptr) = box_batch_tag(tag);
391 
392         let code = unsafe {
393             let details_ptr = status
394                 .details
395                 .as_ref()
396                 .map_or_else(ptr::null, |s| s.as_ptr() as _);
397             let details_len = status.details.as_ref().map_or(0, String::len);
398             grpc_sys::grpcwrap_call_send_status_from_server(
399                 call_ptr,
400                 batch_ptr,
401                 status.status.into(),
402                 details_ptr,
403                 details_len,
404                 ptr::null_mut(),
405                 1,
406                 ptr::null_mut(),
407                 0,
408                 tag_ptr as *mut c_void,
409             )
410         };
411         if code != grpc_call_error::GRPC_CALL_OK {
412             unsafe {
413                 Box::from_raw(tag_ptr);
414             }
415             panic!("create call fail: {:?}", code);
416         }
417     }
418 
419     /// Cancel the rpc call by client.
cancel(&self)420     fn cancel(&self) {
421         match self.cq.borrow() {
422             // Queue is shutdown, ignore.
423             Err(Error::QueueShutdown) => return,
424             Err(e) => panic!("unexpected error when canceling call: {:?}", e),
425             _ => {}
426         }
427         unsafe {
428             grpc_sys::grpc_call_cancel(self.call, ptr::null_mut());
429         }
430     }
431 }
432 
433 impl Drop for Call {
drop(&mut self)434     fn drop(&mut self) {
435         unsafe { grpc_sys::grpc_call_unref(self.call) }
436     }
437 }
438 
439 /// A share object for client streaming and duplex streaming call.
440 ///
441 /// In both cases, receiver and sender can be polled in the same time,
442 /// hence we need to share the call in the both sides and abort the sink
443 /// once the call is canceled or finished early.
444 struct ShareCall {
445     call: Call,
446     close_f: BatchFuture,
447     finished: bool,
448     status: Option<RpcStatus>,
449 }
450 
451 impl ShareCall {
new(call: Call, close_f: BatchFuture) -> ShareCall452     fn new(call: Call, close_f: BatchFuture) -> ShareCall {
453         ShareCall {
454             call,
455             close_f,
456             finished: false,
457             status: None,
458         }
459     }
460 
461     /// Poll if the call is still alive.
462     ///
463     /// If the call is still running, will register a notification for its completion.
poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>>464     fn poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>> {
465         let res = match Pin::new(&mut self.close_f).poll(cx) {
466             Poll::Ready(Ok(reader)) => {
467                 self.status = Some(RpcStatus::ok());
468                 Poll::Ready(Ok(reader))
469             }
470             Poll::Pending => return Poll::Pending,
471             Poll::Ready(Err(Error::RpcFailure(status))) => {
472                 self.status = Some(status.clone());
473                 Poll::Ready(Err(Error::RpcFailure(status)))
474             }
475             res => res,
476         };
477 
478         self.finished = true;
479         res
480     }
481 
482     /// Check if the call is finished.
check_alive(&mut self) -> Result<()>483     fn check_alive(&mut self) -> Result<()> {
484         if self.finished {
485             // maybe can just take here.
486             return Err(Error::RpcFinished(self.status.clone()));
487         }
488 
489         task::check_alive(&self.close_f)
490     }
491 }
492 
493 /// A helper trait that allows executing function on the internal `ShareCall` struct.
494 trait ShareCallHolder {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R495     fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R;
496 }
497 
498 impl ShareCallHolder for ShareCall {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R499     fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
500         f(self)
501     }
502 }
503 
504 impl ShareCallHolder for Arc<Mutex<ShareCall>> {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R505     fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
506         let mut call = self.lock();
507         f(&mut call)
508     }
509 }
510 
511 /// A helper struct for constructing Stream object for batch requests.
512 struct StreamingBase {
513     close_f: Option<BatchFuture>,
514     msg_f: Option<BatchFuture>,
515     read_done: bool,
516 }
517 
518 impl StreamingBase {
new(close_f: Option<BatchFuture>) -> StreamingBase519     fn new(close_f: Option<BatchFuture>) -> StreamingBase {
520         StreamingBase {
521             close_f,
522             msg_f: None,
523             read_done: false,
524         }
525     }
526 
poll<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, skip_finish_check: bool, ) -> Poll<Option<Result<MessageReader>>>527     fn poll<C: ShareCallHolder>(
528         &mut self,
529         cx: &mut Context,
530         call: &mut C,
531         skip_finish_check: bool,
532     ) -> Poll<Option<Result<MessageReader>>> {
533         if !skip_finish_check {
534             let mut finished = false;
535             if let Some(close_f) = &mut self.close_f {
536                 if Pin::new(close_f).poll(cx)?.is_ready() {
537                     // Don't return immediately, there may be pending data.
538                     finished = true;
539                 }
540             }
541             if finished {
542                 self.close_f.take();
543             }
544         }
545 
546         let mut bytes = None;
547         if !self.read_done {
548             if let Some(msg_f) = &mut self.msg_f {
549                 bytes = ready!(Pin::new(msg_f).poll(cx)?);
550                 if bytes.is_none() {
551                     self.read_done = true;
552                 }
553             }
554         }
555 
556         if self.read_done {
557             if self.close_f.is_none() {
558                 return Poll::Ready(None);
559             }
560             return Poll::Pending;
561         }
562 
563         // so msg_f must be either stale or not initialized yet.
564         self.msg_f.take();
565         let msg_f = call.call(|c| c.call.start_recv_message())?;
566         self.msg_f = Some(msg_f);
567         if bytes.is_none() {
568             self.poll(cx, call, true)
569         } else {
570             Poll::Ready(bytes.map(Ok))
571         }
572     }
573 
574     // Cancel the call if we still have some messages or did not
575     // receive status code.
on_drop<C: ShareCallHolder>(&self, call: &mut C)576     fn on_drop<C: ShareCallHolder>(&self, call: &mut C) {
577         if !self.read_done || self.close_f.is_some() {
578             call.call(|c| c.call.cancel());
579         }
580     }
581 }
582 
583 /// Flags for write operations.
584 #[derive(Default, Clone, Copy)]
585 pub struct WriteFlags {
586     flags: u32,
587 }
588 
589 impl WriteFlags {
590     /// Hint that the write may be buffered and need not go out on the wire immediately.
591     ///
592     /// gRPC is free to buffer the message until the next non-buffered write, or until write stream
593     /// completion, but it need not buffer completely or at all.
buffer_hint(mut self, need_buffered: bool) -> WriteFlags594     pub fn buffer_hint(mut self, need_buffered: bool) -> WriteFlags {
595         client::change_flag(
596             &mut self.flags,
597             grpc_sys::GRPC_WRITE_BUFFER_HINT,
598             need_buffered,
599         );
600         self
601     }
602 
603     /// Force compression to be disabled.
force_no_compress(mut self, no_compress: bool) -> WriteFlags604     pub fn force_no_compress(mut self, no_compress: bool) -> WriteFlags {
605         client::change_flag(
606             &mut self.flags,
607             grpc_sys::GRPC_WRITE_NO_COMPRESS,
608             no_compress,
609         );
610         self
611     }
612 
613     /// Get whether buffer hint is enabled.
get_buffer_hint(self) -> bool614     pub fn get_buffer_hint(self) -> bool {
615         (self.flags & grpc_sys::GRPC_WRITE_BUFFER_HINT) != 0
616     }
617 
618     /// Get whether compression is disabled.
get_force_no_compress(self) -> bool619     pub fn get_force_no_compress(self) -> bool {
620         (self.flags & grpc_sys::GRPC_WRITE_NO_COMPRESS) != 0
621     }
622 }
623 
624 /// A helper struct for constructing Sink object for batch requests.
625 struct SinkBase {
626     // Batch job to be executed in `poll_ready`.
627     batch_f: Option<BatchFuture>,
628     send_metadata: bool,
629     // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch
630     // messages as much as possible.
631     enhance_buffer_strategy: bool,
632     // Buffer used to store the data to be sent, send out the last data in this round of `start_send`.
633     buffer: GrpcSlice,
634     // Write flags used to control the data to be sent in `buffer`.
635     buf_flags: Option<WriteFlags>,
636     // Used to records whether a message in which `buffer_hint` is false exists.
637     // Note: only used in enhanced buffer strategy.
638     last_buf_hint: bool,
639 }
640 
641 impl SinkBase {
new(send_metadata: bool) -> SinkBase642     fn new(send_metadata: bool) -> SinkBase {
643         SinkBase {
644             batch_f: None,
645             buffer: GrpcSlice::default(),
646             buf_flags: None,
647             last_buf_hint: true,
648             send_metadata,
649             enhance_buffer_strategy: false,
650         }
651     }
652 
start_send<T, C: ShareCallHolder>( &mut self, call: &mut C, t: &T, flags: WriteFlags, ser: SerializeFn<T>, ) -> Result<()>653     fn start_send<T, C: ShareCallHolder>(
654         &mut self,
655         call: &mut C,
656         t: &T,
657         flags: WriteFlags,
658         ser: SerializeFn<T>,
659     ) -> Result<()> {
660         // temporary fix: buffer hint with send meta will not send out any metadata.
661         // note: only the first message can enter this code block.
662         if self.send_metadata {
663             ser(t, &mut self.buffer);
664             self.buf_flags = Some(flags);
665             self.start_send_buffer_message(false, call)?;
666             self.send_metadata = false;
667             return Ok(());
668         }
669 
670         // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
671         // that this is not the last message.
672         if self.buf_flags.is_some() {
673             self.start_send_buffer_message(true, call)?;
674         }
675 
676         ser(t, &mut self.buffer);
677         let hint = flags.get_buffer_hint();
678         self.last_buf_hint &= hint;
679         self.buf_flags = Some(flags);
680 
681         // If sink disable batch, start sending the message in buffer immediately.
682         if !self.enhance_buffer_strategy {
683             self.start_send_buffer_message(hint, call)?;
684         }
685 
686         Ok(())
687     }
688 
689     #[inline]
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>>690     fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>> {
691         match &mut self.batch_f {
692             None => return Poll::Ready(Ok(())),
693             Some(f) => {
694                 ready!(Pin::new(f).poll(cx)?);
695             }
696         }
697         self.batch_f.take();
698         Poll::Ready(Ok(()))
699     }
700 
701     #[inline]
poll_flush<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, ) -> Poll<Result<()>>702     fn poll_flush<C: ShareCallHolder>(
703         &mut self,
704         cx: &mut Context,
705         call: &mut C,
706     ) -> Poll<Result<()>> {
707         if self.batch_f.is_some() {
708             ready!(self.poll_ready(cx)?);
709         }
710         if self.buf_flags.is_some() {
711             self.start_send_buffer_message(self.last_buf_hint, call)?;
712             ready!(self.poll_ready(cx)?);
713         }
714         self.last_buf_hint = true;
715         Poll::Ready(Ok(()))
716     }
717 
718     #[inline]
start_send_buffer_message<C: ShareCallHolder>( &mut self, buffer_hint: bool, call: &mut C, ) -> Result<()>719     fn start_send_buffer_message<C: ShareCallHolder>(
720         &mut self,
721         buffer_hint: bool,
722         call: &mut C,
723     ) -> Result<()> {
724         // `start_send` is supposed to be called after `poll_ready` returns ready.
725         assert!(self.batch_f.is_none());
726 
727         let mut flags = self.buf_flags.clone().unwrap();
728         flags = flags.buffer_hint(buffer_hint);
729         let write_f = call.call(|c| {
730             c.call
731                 .start_send_message(&mut self.buffer, flags.flags, self.send_metadata)
732         })?;
733         self.batch_f = Some(write_f);
734         if !self.buffer.is_inline() {
735             self.buffer = GrpcSlice::default();
736         }
737         self.buf_flags.take();
738         Ok(())
739     }
740 }
741