1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::ffi::CStr;
4 use std::pin::Pin;
5 use std::sync::Arc;
6 use std::time::Duration;
7 use std::{result, slice};
8 
9 use crate::grpc_sys::{
10     self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
11 };
12 use futures::future::Future;
13 use futures::ready;
14 use futures::sink::Sink;
15 use futures::stream::Stream;
16 use futures::task::{Context, Poll};
17 use parking_lot::Mutex;
18 
19 use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
20 use crate::auth_context::AuthContext;
21 use crate::buf::GrpcSlice;
22 use crate::call::{
23     BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
24 };
25 use crate::codec::{DeserializeFn, SerializeFn};
26 use crate::cq::CompletionQueue;
27 use crate::error::{Error, Result};
28 use crate::metadata::Metadata;
29 use crate::server::ServerChecker;
30 use crate::server::{BoxHandler, RequestCallContext};
31 use crate::task::{BatchFuture, CallTag, Executor, Kicker};
32 use crate::CheckResult;
33 
34 /// A time point that an rpc or operation should finished before it.
35 #[derive(Clone, Copy)]
36 pub struct Deadline {
37     pub(crate) spec: gpr_timespec,
38 }
39 
40 impl Deadline {
new(spec: gpr_timespec) -> Deadline41     fn new(spec: gpr_timespec) -> Deadline {
42         let realtime_spec =
43             unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
44 
45         Deadline {
46             spec: realtime_spec,
47         }
48     }
49 
50     /// Checks if the deadline is exceeded.
exceeded(self) -> bool51     pub fn exceeded(self) -> bool {
52         unsafe {
53             let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
54             grpc_sys::gpr_time_cmp(now, self.spec) >= 0
55         }
56     }
57 
spec(self) -> gpr_timespec58     pub(crate) fn spec(self) -> gpr_timespec {
59         self.spec
60     }
61 }
62 
63 impl From<Duration> for Deadline {
64     /// Build a deadline from given duration.
65     ///
66     /// The deadline will be `now + duration`.
67     #[inline]
from(dur: Duration) -> Deadline68     fn from(dur: Duration) -> Deadline {
69         Deadline::new(dur.into())
70     }
71 }
72 
73 /// Context for accepting a request.
74 pub struct RequestContext {
75     ctx: *mut grpcwrap_request_call_context,
76     request_call: Option<RequestCallContext>,
77 }
78 
79 impl RequestContext {
new(rc: RequestCallContext) -> RequestContext80     pub fn new(rc: RequestCallContext) -> RequestContext {
81         let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
82 
83         RequestContext {
84             ctx,
85             request_call: Some(rc),
86         }
87     }
88 
89     /// Try to accept a client side streaming request.
90     ///
91     /// Return error if the request is a client side unary request.
handle_stream_req( self, cq: &CompletionQueue, rc: &mut RequestCallContext, ) -> result::Result<(), Self>92     pub fn handle_stream_req(
93         self,
94         cq: &CompletionQueue,
95         rc: &mut RequestCallContext,
96     ) -> result::Result<(), Self> {
97         let checker = rc.get_checker();
98         let handler = unsafe { rc.get_handler(self.method()) };
99         match handler {
100             Some(handler) => match handler.method_type() {
101                 MethodType::Unary | MethodType::ServerStreaming => Err(self),
102                 _ => {
103                     execute(self, cq, None, handler, checker);
104                     Ok(())
105                 }
106             },
107             None => {
108                 execute_unimplemented(self, cq.clone());
109                 Ok(())
110             }
111         }
112     }
113 
114     /// Accept a client side unary request.
115     ///
116     /// This method should be called after `handle_stream_req`. When handling
117     /// client side unary request, handler will only be called after the unary
118     /// request is received.
handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue)119     pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
120         // fetch message before calling callback.
121         let tag = Box::new(CallTag::unary_request(self, rc));
122         let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
123         let request_ctx = tag.request_ctx().unwrap().as_ptr();
124         let tag_ptr = Box::into_raw(tag);
125         unsafe {
126             let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
127             let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
128             if code != grpc_call_error::GRPC_CALL_OK {
129                 Box::from_raw(tag_ptr);
130                 // it should not failed.
131                 panic!("try to receive message fail: {:?}", code);
132             }
133         }
134     }
135 
take_request_call_context(&mut self) -> Option<RequestCallContext>136     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
137         self.request_call.take()
138     }
139 
as_ptr(&self) -> *mut grpcwrap_request_call_context140     pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
141         self.ctx
142     }
143 
call(&self, cq: CompletionQueue) -> Call144     fn call(&self, cq: CompletionQueue) -> Call {
145         unsafe {
146             // It is okay to use a mutable pointer on a immutable reference, `self`,
147             // because grpcwrap_request_call_context_ref_call is thread-safe.
148             let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
149             assert!(!call.is_null());
150             Call::from_raw(call, cq)
151         }
152     }
153 
method(&self) -> &[u8]154     pub fn method(&self) -> &[u8] {
155         let mut len = 0;
156         let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
157 
158         unsafe { slice::from_raw_parts(method as _, len) }
159     }
160 
host(&self) -> &[u8]161     fn host(&self) -> &[u8] {
162         let mut len = 0;
163         let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
164 
165         unsafe { slice::from_raw_parts(host as _, len) }
166     }
167 
deadline(&self) -> Deadline168     fn deadline(&self) -> Deadline {
169         let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
170 
171         Deadline::new(t)
172     }
173 
metadata(&self) -> &Metadata174     fn metadata(&self) -> &Metadata {
175         unsafe {
176             let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
177             let arr_ptr: *const Metadata = ptr as _;
178             &*arr_ptr
179         }
180     }
181 
peer(&self) -> String182     fn peer(&self) -> String {
183         unsafe {
184             // RequestContext always holds a reference of the call.
185             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
186             let p = grpc_sys::grpc_call_get_peer(call);
187             let peer = CStr::from_ptr(p)
188                 .to_str()
189                 .expect("valid UTF-8 data")
190                 .to_owned();
191             grpc_sys::gpr_free(p as _);
192             peer
193         }
194     }
195 
196     /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>197     fn auth_context(&self) -> Option<AuthContext> {
198         unsafe {
199             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
200             AuthContext::from_call_ptr(call)
201         }
202     }
203 }
204 
205 impl Drop for RequestContext {
drop(&mut self)206     fn drop(&mut self) {
207         unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
208     }
209 }
210 
211 /// A context for handling client side unary request.
212 pub struct UnaryRequestContext {
213     request: RequestContext,
214     request_call: Option<RequestCallContext>,
215     batch: BatchContext,
216 }
217 
218 impl UnaryRequestContext {
new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext219     pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
220         UnaryRequestContext {
221             request: ctx,
222             request_call: Some(rc),
223             batch: BatchContext::new(),
224         }
225     }
226 
batch_ctx(&self) -> &BatchContext227     pub fn batch_ctx(&self) -> &BatchContext {
228         &self.batch
229     }
230 
batch_ctx_mut(&mut self) -> &mut BatchContext231     pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
232         &mut self.batch
233     }
234 
request_ctx(&self) -> &RequestContext235     pub fn request_ctx(&self) -> &RequestContext {
236         &self.request
237     }
238 
take_request_call_context(&mut self) -> Option<RequestCallContext>239     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
240         self.request_call.take()
241     }
242 
handle( self, rc: &mut RequestCallContext, cq: &CompletionQueue, reader: Option<MessageReader>, )243     pub fn handle(
244         self,
245         rc: &mut RequestCallContext,
246         cq: &CompletionQueue,
247         reader: Option<MessageReader>,
248     ) {
249         let checker = rc.get_checker();
250         let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
251         if reader.is_some() {
252             return execute(self.request, cq, reader, handler, checker);
253         }
254 
255         let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned()));
256         self.request.call(cq.clone()).abort(&status)
257     }
258 }
259 
260 /// A stream for client a streaming call and a duplex streaming call.
261 ///
262 /// The corresponding RPC will be canceled if the stream did not
263 /// finish before dropping.
264 #[must_use = "if unused the RequestStream may immediately cancel the RPC"]
265 pub struct RequestStream<T> {
266     call: Arc<Mutex<ShareCall>>,
267     base: StreamingBase,
268     de: DeserializeFn<T>,
269 }
270 
271 impl<T> RequestStream<T> {
new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T>272     fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
273         RequestStream {
274             call,
275             base: StreamingBase::new(None),
276             de,
277         }
278     }
279 }
280 
281 impl<T> Stream for RequestStream<T> {
282     type Item = Result<T>;
283 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>284     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
285         {
286             let mut call = self.call.lock();
287             call.check_alive()?;
288         }
289 
290         let t = &mut *self;
291         match ready!(t.base.poll(cx, &mut t.call, false)?) {
292             None => Poll::Ready(None),
293             Some(data) => Poll::Ready(Some((t.de)(data))),
294         }
295     }
296 }
297 
298 impl<T> Drop for RequestStream<T> {
299     /// The corresponding RPC will be canceled if the stream did not
300     /// finish before dropping.
drop(&mut self)301     fn drop(&mut self) {
302         self.base.on_drop(&mut self.call);
303     }
304 }
305 
306 /// A helper macro used to implement server side unary sink.
307 /// Not using generic here because we don't need to expose
308 /// `CallHolder` or `Call` to caller.
309 // TODO: Use type alias to be friendly for documentation.
310 macro_rules! impl_unary_sink {
311     ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
312         pub struct $rt {
313             call: $holder,
314             cq_f: Option<BatchFuture>,
315             err: Option<Error>,
316         }
317 
318         impl Future for $rt {
319             type Output = Result<()>;
320 
321             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
322                 if let Some(e) = self.err.take() {
323                     return Poll::Ready(Err(e));
324                 }
325 
326                 if self.cq_f.is_some() {
327                     ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
328                     self.cq_f.take();
329                 }
330 
331                 ready!(self.call.call(|c| c.poll_finish(cx))?);
332                 Poll::Ready(Ok(()))
333             }
334         }
335 
336         $(#[$attr])*
337         pub struct $t<T> {
338             call: Option<$holder>,
339             write_flags: u32,
340             ser: SerializeFn<T>,
341         }
342 
343         impl<T> $t<T> {
344             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
345                 $t {
346                     call: Some(call),
347                     write_flags: 0,
348                     ser,
349                 }
350             }
351 
352             pub fn success(self, t: T) -> $rt {
353                 self.complete(RpcStatus::ok(), Some(t))
354             }
355 
356             pub fn fail(self, status: RpcStatus) -> $rt {
357                 self.complete(status, None)
358             }
359 
360             fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
361                 let mut data = t.as_ref().map(|t| {
362                     let mut buf = GrpcSlice::default();
363                     (self.ser)(t, &mut buf);
364                     buf
365                 });
366 
367                 let write_flags = self.write_flags;
368                 let res = self.call.as_mut().unwrap().call(|c| {
369                     c.call
370                         .start_send_status_from_server(&status, true, &mut data, write_flags)
371                 });
372 
373                 let (cq_f, err) = match res {
374                     Ok(f) => (Some(f), None),
375                     Err(e) => (None, Some(e)),
376                 };
377 
378                 $rt {
379                     call: self.call.take().unwrap(),
380                     cq_f,
381                     err,
382                 }
383             }
384         }
385 
386         impl<T> Drop for $t<T> {
387             /// The corresponding RPC will be canceled if the sink did not
388             /// send a response before dropping.
389             fn drop(&mut self) {
390                 self.call
391                     .as_mut()
392                     .map(|call| call.call(|c| c.call.cancel()));
393             }
394         }
395     };
396 }
397 
398 impl_unary_sink!(
399     /// A sink for unary call.
400     ///
401     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
402     ///
403     /// [`success`]: #method.success
404     /// [`fail`]: #method.fail
405     #[must_use = "if unused the sink may immediately cancel the RPC"]
406     UnarySink,
407     UnarySinkResult,
408     ShareCall
409 );
410 impl_unary_sink!(
411     /// A sink for client streaming call.
412     ///
413     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
414     ///
415     /// [`success`]: #method.success
416     /// [`fail`]: #method.fail
417     #[must_use = "if unused the sink may immediately cancel the RPC"]
418     ClientStreamingSink,
419     ClientStreamingSinkResult,
420     Arc<Mutex<ShareCall>>
421 );
422 
423 // A macro helper to implement server side streaming sink.
424 macro_rules! impl_stream_sink {
425     ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
426         $(#[$attr])*
427         pub struct $t<T> {
428             call: Option<$holder>,
429             base: SinkBase,
430             flush_f: Option<BatchFuture>,
431             status: RpcStatus,
432             flushed: bool,
433             closed: bool,
434             ser: SerializeFn<T>,
435         }
436 
437         impl<T> $t<T> {
438             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
439                 $t {
440                     call: Some(call),
441                     base: SinkBase::new(true),
442                     flush_f: None,
443                     status: RpcStatus::ok(),
444                     flushed: false,
445                     closed: false,
446                     ser,
447                 }
448             }
449 
450             /// By default it always sends messages with their configured buffer hint. But when the
451             /// `enhance_batch` is enabled, messages will be batched together as many as possible.
452             /// The rules are listed as below:
453             /// - All messages except the last one will be sent with `buffer_hint` set to true.
454             /// - The last message will also be sent with `buffer_hint` set to true unless any message is
455             ///    offered with buffer hint set to false.
456             ///
457             /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
458             /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
459             pub fn enhance_batch(&mut self, flag: bool) {
460                 self.base.enhance_buffer_strategy = flag;
461             }
462 
463             pub fn set_status(&mut self, status: RpcStatus) {
464                 assert!(self.flush_f.is_none());
465                 self.status = status;
466             }
467 
468             pub fn fail(mut self, status: RpcStatus) -> $ft {
469                 assert!(self.flush_f.is_none());
470                 let send_metadata = self.base.send_metadata;
471                 let res = self.call.as_mut().unwrap().call(|c| {
472                     c.call
473                         .start_send_status_from_server(&status, send_metadata, &mut None, 0)
474                 });
475 
476                 let (fail_f, err) = match res {
477                     Ok(f) => (Some(f), None),
478                     Err(e) => (None, Some(e)),
479                 };
480 
481                 $ft {
482                     call: self.call.take().unwrap(),
483                     fail_f,
484                     err,
485                 }
486             }
487         }
488 
489         impl<T> Drop for $t<T> {
490             /// The corresponding RPC will be canceled if the sink did not call
491             /// [`close`] or [`fail`] before dropping.
492             ///
493             /// [`close`]: #method.close
494             /// [`fail`]: #method.fail
495             fn drop(&mut self) {
496                 // We did not close it explicitly and it was not dropped in the `fail`.
497                 if !self.closed && self.call.is_some() {
498                     let mut call = self.call.take().unwrap();
499                     call.call(|c| c.call.cancel());
500                 }
501             }
502         }
503 
504         impl<T> Sink<(T, WriteFlags)> for $t<T> {
505             type Error = Error;
506 
507             #[inline]
508             fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
509                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
510                     return Poll::Ready(Err(Error::RemoteStopped));
511                 }
512                 Pin::new(&mut self.base).poll_ready(cx)
513             }
514 
515             #[inline]
516             fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
517                 let t = &mut *self;
518                 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser)
519             }
520 
521             #[inline]
522             fn poll_flush(mut self: Pin<&mut Self>,  cx: &mut Context) -> Poll<Result<()>> {
523                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
524                     return Poll::Ready(Err(Error::RemoteStopped));
525                 }
526                 let t = &mut *self;
527                 Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap())
528             }
529 
530             fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
531                 if self.flush_f.is_none() {
532                     ready!(Pin::new(&mut self.base).poll_ready(cx)?);
533 
534                     let send_metadata = self.base.send_metadata;
535                     let t = &mut *self;
536                     let status = &t.status;
537                     let flush_f = t.call.as_mut().unwrap().call(|c| {
538                         c.call
539                             .start_send_status_from_server(status, send_metadata, &mut None, 0)
540                     })?;
541                     t.flush_f = Some(flush_f);
542                 }
543 
544                 if !self.flushed {
545                     ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
546                     self.flushed = true;
547                 }
548 
549                 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
550                 self.closed = true;
551                 Poll::Ready(Ok(()))
552             }
553         }
554 
555         #[must_use = "if unused the sink failure may immediately cancel the RPC"]
556         pub struct $ft {
557             call: $holder,
558             fail_f: Option<BatchFuture>,
559             err: Option<Error>,
560         }
561 
562         impl Future for $ft {
563             type Output = Result<()>;
564 
565             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
566                 if let Some(e) = self.err.take() {
567                     return Poll::Ready(Err(e));
568                 }
569 
570                 let readiness = self.call.call(|c| {
571                     if c.finished {
572                         return Poll::Ready(Ok(()));
573                     }
574 
575                     c.poll_finish(cx).map(|r| r.map(|_| ()))
576                 })?;
577 
578                 if let Some(ref mut f) = self.fail_f {
579                     ready!(Pin::new(f).poll(cx)?);
580                 }
581 
582                 self.fail_f.take();
583                 readiness.map(Ok)
584             }
585         }
586     };
587 }
588 
589 impl_stream_sink!(
590     /// A sink for server streaming call.
591     ///
592     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
593     ///
594     /// [`close`]: #method.close
595     /// [`fail`]: #method.fail
596     #[must_use = "if unused the sink may immediately cancel the RPC"]
597     ServerStreamingSink,
598     ServerStreamingSinkFailure,
599     ShareCall
600 );
601 impl_stream_sink!(
602     /// A sink for duplex streaming call.
603     ///
604     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
605     ///
606     /// [`close`]: #method.close
607     /// [`fail`]: #method.fail
608     #[must_use = "if unused the sink may immediately cancel the RPC"]
609     DuplexSink,
610     DuplexSinkFailure,
611     Arc<Mutex<ShareCall>>
612 );
613 
614 /// A context for rpc handling.
615 pub struct RpcContext<'a> {
616     ctx: RequestContext,
617     executor: Executor<'a>,
618     deadline: Deadline,
619 }
620 
621 impl<'a> RpcContext<'a> {
new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_>622     fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
623         RpcContext {
624             deadline: ctx.deadline(),
625             ctx,
626             executor: Executor::new(cq),
627         }
628     }
629 
kicker(&self) -> Kicker630     fn kicker(&self) -> Kicker {
631         let call = self.call();
632         Kicker::from_call(call)
633     }
634 
call(&self) -> Call635     pub(crate) fn call(&self) -> Call {
636         self.ctx.call(self.executor.cq().clone())
637     }
638 
method(&self) -> &[u8]639     pub fn method(&self) -> &[u8] {
640         self.ctx.method()
641     }
642 
host(&self) -> &[u8]643     pub fn host(&self) -> &[u8] {
644         self.ctx.host()
645     }
646 
deadline(&self) -> Deadline647     pub fn deadline(&self) -> Deadline {
648         self.deadline
649     }
650 
651     /// Get the initial metadata sent by client.
request_headers(&self) -> &Metadata652     pub fn request_headers(&self) -> &Metadata {
653         self.ctx.metadata()
654     }
655 
peer(&self) -> String656     pub fn peer(&self) -> String {
657         self.ctx.peer()
658     }
659 
660     /// Wrapper around the gRPC Core AuthContext
661     ///
662     /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>663     pub fn auth_context(&self) -> Option<AuthContext> {
664         self.ctx.auth_context()
665     }
666 
667     /// Spawn the future into current gRPC poll thread.
668     ///
669     /// This can reduce a lot of context switching, but please make
670     /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,671     pub fn spawn<F>(&self, f: F)
672     where
673         F: Future<Output = ()> + Send + 'static,
674     {
675         self.executor.spawn(f, self.kicker())
676     }
677 }
678 
679 // Following four helper functions are used to create a callback closure.
680 
681 macro_rules! accept_call {
682     ($call:expr) => {
683         match $call.start_server_side() {
684             Err(Error::QueueShutdown) => return,
685             Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
686             Ok(f) => f,
687         }
688     };
689 }
690 
691 // Helper function to call a unary handler.
execute_unary<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, UnarySink<Q>),692 pub fn execute_unary<P, Q, F>(
693     ctx: RpcContext<'_>,
694     ser: SerializeFn<Q>,
695     de: DeserializeFn<P>,
696     payload: MessageReader,
697     f: &mut F,
698 ) where
699     F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
700 {
701     let mut call = ctx.call();
702     let close_f = accept_call!(call);
703     let request = match de(payload) {
704         Ok(f) => f,
705         Err(e) => {
706             let status = RpcStatus::new(
707                 RpcStatusCode::INTERNAL,
708                 Some(format!("Failed to deserialize response message: {:?}", e)),
709             );
710             call.abort(&status);
711             return;
712         }
713     };
714     let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
715     f(ctx, request, sink)
716 }
717 
718 // Helper function to call client streaming handler.
execute_client_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),719 pub fn execute_client_streaming<P, Q, F>(
720     ctx: RpcContext<'_>,
721     ser: SerializeFn<Q>,
722     de: DeserializeFn<P>,
723     f: &mut F,
724 ) where
725     F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
726 {
727     let mut call = ctx.call();
728     let close_f = accept_call!(call);
729     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
730 
731     let req_s = RequestStream::new(call.clone(), de);
732     let sink = ClientStreamingSink::new(call, ser);
733     f(ctx, req_s, sink)
734 }
735 
736 // Helper function to call server streaming handler.
execute_server_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),737 pub fn execute_server_streaming<P, Q, F>(
738     ctx: RpcContext<'_>,
739     ser: SerializeFn<Q>,
740     de: DeserializeFn<P>,
741     payload: MessageReader,
742     f: &mut F,
743 ) where
744     F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
745 {
746     let mut call = ctx.call();
747     let close_f = accept_call!(call);
748 
749     let request = match de(payload) {
750         Ok(t) => t,
751         Err(e) => {
752             let status = RpcStatus::new(
753                 RpcStatusCode::INTERNAL,
754                 Some(format!("Failed to deserialize response message: {:?}", e)),
755             );
756             call.abort(&status);
757             return;
758         }
759     };
760 
761     let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
762     f(ctx, request, sink)
763 }
764 
765 // Helper function to call duplex streaming handler.
execute_duplex_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),766 pub fn execute_duplex_streaming<P, Q, F>(
767     ctx: RpcContext<'_>,
768     ser: SerializeFn<Q>,
769     de: DeserializeFn<P>,
770     f: &mut F,
771 ) where
772     F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
773 {
774     let mut call = ctx.call();
775     let close_f = accept_call!(call);
776     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
777 
778     let req_s = RequestStream::new(call.clone(), de);
779     let sink = DuplexSink::new(call, ser);
780     f(ctx, req_s, sink)
781 }
782 
783 // A helper function used to handle all undefined rpc calls.
execute_unimplemented(ctx: RequestContext, cq: CompletionQueue)784 pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
785     // Suppress needless-pass-by-value.
786     let ctx = ctx;
787     let mut call = ctx.call(cq);
788     accept_call!(call);
789     call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None))
790 }
791 
792 // Helper function to call handler.
793 //
794 // Invoked after a request is ready to be handled.
execute( ctx: RequestContext, cq: &CompletionQueue, payload: Option<MessageReader>, f: &mut BoxHandler, mut checkers: Vec<Box<dyn ServerChecker>>, )795 fn execute(
796     ctx: RequestContext,
797     cq: &CompletionQueue,
798     payload: Option<MessageReader>,
799     f: &mut BoxHandler,
800     mut checkers: Vec<Box<dyn ServerChecker>>,
801 ) {
802     let rpc_ctx = RpcContext::new(ctx, cq);
803 
804     for handler in checkers.iter_mut() {
805         match handler.check(&rpc_ctx) {
806             CheckResult::Continue => {}
807             CheckResult::Abort(status) => {
808                 rpc_ctx.call().abort(&status);
809                 return;
810             }
811         }
812     }
813 
814     f.handle(rpc_ctx, payload)
815 }
816