1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::cell::UnsafeCell;
4 use std::collections::HashMap;
5 use std::fmt::{self, Debug, Formatter};
6 use std::net::{IpAddr, SocketAddr};
7 use std::pin::Pin;
8 use std::ptr;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::sync::Arc;
11 
12 use crate::grpc_sys::{self, grpc_call_error, grpc_server};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16 
17 use crate::call::server::*;
18 use crate::call::{MessageReader, Method, MethodType};
19 use crate::channel::ChannelArgs;
20 use crate::cq::CompletionQueue;
21 use crate::env::Environment;
22 use crate::error::{Error, Result};
23 use crate::task::{CallTag, CqFuture};
24 use crate::RpcContext;
25 use crate::RpcStatus;
26 
27 const DEFAULT_REQUEST_SLOTS_PER_CQ: usize = 1024;
28 
29 /// An RPC call holder.
30 #[derive(Clone)]
31 pub struct Handler<F> {
32     method_type: MethodType,
33     cb: F,
34 }
35 
36 impl<F> Handler<F> {
new(method_type: MethodType, cb: F) -> Handler<F>37     pub fn new(method_type: MethodType, cb: F) -> Handler<F> {
38         Handler { method_type, cb }
39     }
40 }
41 
42 pub trait CloneableHandler: Send {
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)43     fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>);
box_clone(&self) -> Box<dyn CloneableHandler>44     fn box_clone(&self) -> Box<dyn CloneableHandler>;
method_type(&self) -> MethodType45     fn method_type(&self) -> MethodType;
46 }
47 
48 impl<F: 'static> CloneableHandler for Handler<F>
49 where
50     F: FnMut(RpcContext<'_>, Option<MessageReader>) + Send + Clone,
51 {
52     #[inline]
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)53     fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>) {
54         (self.cb)(ctx, reqs)
55     }
56 
57     #[inline]
box_clone(&self) -> Box<dyn CloneableHandler>58     fn box_clone(&self) -> Box<dyn CloneableHandler> {
59         Box::new(self.clone())
60     }
61 
62     #[inline]
method_type(&self) -> MethodType63     fn method_type(&self) -> MethodType {
64         self.method_type
65     }
66 }
67 
68 /// Given a host and port, creates a string of the form "host:port" or
69 /// "[host]:port", depending on whether the host is an IPv6 literal.
join_host_port(host: &str, port: u16) -> String70 fn join_host_port(host: &str, port: u16) -> String {
71     if host.starts_with("unix:") {
72         format!("{}\0", host)
73     } else if let Ok(ip) = host.parse::<IpAddr>() {
74         format!("{}\0", SocketAddr::new(ip, port))
75     } else {
76         format!("{}:{}\0", host, port)
77     }
78 }
79 
80 #[cfg(feature = "secure")]
81 mod imp {
82     use super::join_host_port;
83     use crate::grpc_sys::{self, grpc_server};
84     use crate::security::ServerCredentialsFetcher;
85     use crate::ServerCredentials;
86 
87     pub struct Binder {
88         pub host: String,
89         pub port: u16,
90         cred: Option<ServerCredentials>,
91         _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
92     }
93 
94     impl Binder {
new(host: String, port: u16) -> Binder95         pub fn new(host: String, port: u16) -> Binder {
96             let cred = None;
97             Binder {
98                 host,
99                 port,
100                 cred,
101                 _fetcher: None,
102             }
103         }
104 
with_cred( host: String, port: u16, cred: ServerCredentials, _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>, ) -> Binder105         pub fn with_cred(
106             host: String,
107             port: u16,
108             cred: ServerCredentials,
109             _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
110         ) -> Binder {
111             let cred = Some(cred);
112             Binder {
113                 host,
114                 port,
115                 cred,
116                 _fetcher,
117             }
118         }
119 
bind(&mut self, server: *mut grpc_server) -> u16120         pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
121             let addr = join_host_port(&self.host, self.port);
122             let port = match self.cred.take() {
123                 None => grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _),
124                 Some(mut cert) => grpc_sys::grpc_server_add_secure_http2_port(
125                     server,
126                     addr.as_ptr() as _,
127                     cert.as_mut_ptr(),
128                 ),
129             };
130             port as u16
131         }
132     }
133 }
134 
135 #[cfg(not(feature = "secure"))]
136 mod imp {
137     use super::join_host_port;
138     use crate::grpc_sys::{self, grpc_server};
139 
140     pub struct Binder {
141         pub host: String,
142         pub port: u16,
143     }
144 
145     impl Binder {
new(host: String, port: u16) -> Binder146         pub fn new(host: String, port: u16) -> Binder {
147             Binder { host, port }
148         }
149 
bind(&mut self, server: *mut grpc_server) -> u16150         pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
151             let addr = join_host_port(&self.host, self.port);
152             grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _) as u16
153         }
154     }
155 }
156 
157 use self::imp::Binder;
158 
159 impl Debug for Binder {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result160     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
161         write!(f, "Binder {{ host: {}, port: {} }}", self.host, self.port)
162     }
163 }
164 
165 /// [`Service`] factory in order to configure the properties.
166 ///
167 /// Use it to build a service which can be registered to a server.
168 pub struct ServiceBuilder {
169     handlers: HashMap<&'static [u8], BoxHandler>,
170 }
171 
172 impl ServiceBuilder {
173     /// Initialize a new [`ServiceBuilder`].
new() -> ServiceBuilder174     pub fn new() -> ServiceBuilder {
175         ServiceBuilder {
176             handlers: HashMap::new(),
177         }
178     }
179 
180     /// Add a unary RPC call handler.
add_unary_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,181     pub fn add_unary_handler<Req, Resp, F>(
182         mut self,
183         method: &Method<Req, Resp>,
184         mut handler: F,
185     ) -> ServiceBuilder
186     where
187         Req: 'static,
188         Resp: 'static,
189         F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,
190     {
191         let (ser, de) = (method.resp_ser(), method.req_de());
192         let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
193             execute_unary(ctx, ser, de, payload.unwrap(), &mut handler)
194         };
195         let ch = Box::new(Handler::new(MethodType::Unary, h));
196         self.handlers.insert(method.name.as_bytes(), ch);
197         self
198     }
199 
200     /// Add a client streaming RPC call handler.
add_client_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>) + Send + Clone + 'static,201     pub fn add_client_streaming_handler<Req, Resp, F>(
202         mut self,
203         method: &Method<Req, Resp>,
204         mut handler: F,
205     ) -> ServiceBuilder
206     where
207         Req: 'static,
208         Resp: 'static,
209         F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>)
210             + Send
211             + Clone
212             + 'static,
213     {
214         let (ser, de) = (method.resp_ser(), method.req_de());
215         let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
216             execute_client_streaming(ctx, ser, de, &mut handler)
217         };
218         let ch = Box::new(Handler::new(MethodType::ClientStreaming, h));
219         self.handlers.insert(method.name.as_bytes(), ch);
220         self
221     }
222 
223     /// Add a server streaming RPC call handler.
add_server_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,224     pub fn add_server_streaming_handler<Req, Resp, F>(
225         mut self,
226         method: &Method<Req, Resp>,
227         mut handler: F,
228     ) -> ServiceBuilder
229     where
230         Req: 'static,
231         Resp: 'static,
232         F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,
233     {
234         let (ser, de) = (method.resp_ser(), method.req_de());
235         let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
236             execute_server_streaming(ctx, ser, de, payload.unwrap(), &mut handler)
237         };
238         let ch = Box::new(Handler::new(MethodType::ServerStreaming, h));
239         self.handlers.insert(method.name.as_bytes(), ch);
240         self
241     }
242 
243     /// Add a duplex streaming RPC call handler.
add_duplex_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,244     pub fn add_duplex_streaming_handler<Req, Resp, F>(
245         mut self,
246         method: &Method<Req, Resp>,
247         mut handler: F,
248     ) -> ServiceBuilder
249     where
250         Req: 'static,
251         Resp: 'static,
252         F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,
253     {
254         let (ser, de) = (method.resp_ser(), method.req_de());
255         let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
256             execute_duplex_streaming(ctx, ser, de, &mut handler)
257         };
258         let ch = Box::new(Handler::new(MethodType::Duplex, h));
259         self.handlers.insert(method.name.as_bytes(), ch);
260         self
261     }
262 
263     /// Finalize the [`ServiceBuilder`] and build the [`Service`].
build(self) -> Service264     pub fn build(self) -> Service {
265         Service {
266             handlers: self.handlers,
267         }
268     }
269 }
270 
271 /// Used to indicate the result of the check. If it returns `Abort`,
272 /// skip the subsequent checkers and abort the grpc call.
273 pub enum CheckResult {
274     Continue,
275     Abort(RpcStatus),
276 }
277 
278 pub trait ServerChecker: Send {
check(&mut self, ctx: &RpcContext) -> CheckResult279     fn check(&mut self, ctx: &RpcContext) -> CheckResult;
box_clone(&self) -> Box<dyn ServerChecker>280     fn box_clone(&self) -> Box<dyn ServerChecker>;
281 }
282 
283 impl Clone for Box<dyn ServerChecker> {
clone(&self) -> Self284     fn clone(&self) -> Self {
285         self.box_clone()
286     }
287 }
288 
289 /// A gRPC service.
290 ///
291 /// Use [`ServiceBuilder`] to build a [`Service`].
292 pub struct Service {
293     handlers: HashMap<&'static [u8], BoxHandler>,
294 }
295 
296 /// [`Server`] factory in order to configure the properties.
297 pub struct ServerBuilder {
298     env: Arc<Environment>,
299     binders: Vec<Binder>,
300     args: Option<ChannelArgs>,
301     slots_per_cq: usize,
302     handlers: HashMap<&'static [u8], BoxHandler>,
303     checkers: Vec<Box<dyn ServerChecker>>,
304 }
305 
306 impl ServerBuilder {
307     /// Initialize a new [`ServerBuilder`].
new(env: Arc<Environment>) -> ServerBuilder308     pub fn new(env: Arc<Environment>) -> ServerBuilder {
309         ServerBuilder {
310             env,
311             binders: Vec::new(),
312             args: None,
313             slots_per_cq: DEFAULT_REQUEST_SLOTS_PER_CQ,
314             handlers: HashMap::new(),
315             checkers: Vec::new(),
316         }
317     }
318 
319     /// Bind to an address.
320     ///
321     /// This function can be called multiple times to bind to multiple ports.
bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder322     pub fn bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder {
323         self.binders.push(Binder::new(host.into(), port));
324         self
325     }
326 
327     /// Add additional configuration for each incoming channel.
channel_args(mut self, args: ChannelArgs) -> ServerBuilder328     pub fn channel_args(mut self, args: ChannelArgs) -> ServerBuilder {
329         self.args = Some(args);
330         self
331     }
332 
333     /// Set how many requests a completion queue can handle.
requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder334     pub fn requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder {
335         self.slots_per_cq = slots;
336         self
337     }
338 
339     /// Register a service.
register_service(mut self, service: Service) -> ServerBuilder340     pub fn register_service(mut self, service: Service) -> ServerBuilder {
341         self.handlers.extend(service.handlers);
342         self
343     }
344 
345     /// Add a custom checker to handle some tasks before the grpc call handler starts.
346     /// This allows users to operate grpc call based on the context. Users can add
347     /// multiple checkers and they will be executed in the order added.
348     ///
349     /// TODO: Extend this interface to intercepte each payload like grpc-c++.
add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder350     pub fn add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder {
351         self.checkers.push(Box::new(checker));
352         self
353     }
354 
355     /// Finalize the [`ServerBuilder`] and build the [`Server`].
build(mut self) -> Result<Server>356     pub fn build(mut self) -> Result<Server> {
357         let args = self
358             .args
359             .as_ref()
360             .map_or_else(ptr::null, ChannelArgs::as_ptr);
361         unsafe {
362             let server = grpc_sys::grpc_server_create(args, ptr::null_mut());
363             for binder in self.binders.iter_mut() {
364                 let bind_port = binder.bind(server);
365                 if bind_port == 0 {
366                     grpc_sys::grpc_server_destroy(server);
367                     return Err(Error::BindFail(binder.host.clone(), binder.port));
368                 }
369                 binder.port = bind_port;
370             }
371 
372             for cq in self.env.completion_queues() {
373                 let cq_ref = cq.borrow()?;
374                 grpc_sys::grpc_server_register_completion_queue(
375                     server,
376                     cq_ref.as_ptr(),
377                     ptr::null_mut(),
378                 );
379             }
380 
381             Ok(Server {
382                 env: self.env,
383                 core: Arc::new(ServerCore {
384                     server,
385                     shutdown: AtomicBool::new(false),
386                     binders: self.binders,
387                     slots_per_cq: self.slots_per_cq,
388                 }),
389                 handlers: self.handlers,
390                 checkers: self.checkers,
391             })
392         }
393     }
394 }
395 
396 #[cfg(feature = "secure")]
397 mod secure_server {
398     use super::{Binder, ServerBuilder};
399     use crate::grpc_sys;
400     use crate::security::{
401         server_cert_fetcher_wrapper, CertificateRequestType, ServerCredentials,
402         ServerCredentialsFetcher,
403     };
404 
405     impl ServerBuilder {
406         /// Bind to an address with credentials for secure connection.
407         ///
408         /// This function can be called multiple times to bind to multiple ports.
bind_with_cred<S: Into<String>>( mut self, host: S, port: u16, c: ServerCredentials, ) -> ServerBuilder409         pub fn bind_with_cred<S: Into<String>>(
410             mut self,
411             host: S,
412             port: u16,
413             c: ServerCredentials,
414         ) -> ServerBuilder {
415             self.binders
416                 .push(Binder::with_cred(host.into(), port, c, None));
417             self
418         }
419 
420         /// Bind to an address for secure connection.
421         ///
422         /// The required credentials will be fetched using provided `fetcher`. This
423         /// function can be called multiple times to bind to multiple ports.
bind_with_fetcher<S: Into<String>>( mut self, host: S, port: u16, fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>, cer_request_type: CertificateRequestType, ) -> ServerBuilder424         pub fn bind_with_fetcher<S: Into<String>>(
425             mut self,
426             host: S,
427             port: u16,
428             fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>,
429             cer_request_type: CertificateRequestType,
430         ) -> ServerBuilder {
431             let fetcher_wrap = Box::new(fetcher);
432             let fetcher_wrap_ptr = Box::into_raw(fetcher_wrap);
433             let (sc, fb) = unsafe {
434                 let opt = grpc_sys::grpc_ssl_server_credentials_create_options_using_config_fetcher(
435                     cer_request_type.to_native(),
436                     Some(server_cert_fetcher_wrapper),
437                     fetcher_wrap_ptr as _,
438                 );
439                 (
440                     ServerCredentials::frow_raw(
441                         grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt),
442                     ),
443                     Box::from_raw(fetcher_wrap_ptr),
444                 )
445             };
446             self.binders
447                 .push(Binder::with_cred(host.into(), port, sc, Some(fb)));
448             self
449         }
450     }
451 }
452 
453 struct ServerCore {
454     server: *mut grpc_server,
455     binders: Vec<Binder>,
456     slots_per_cq: usize,
457     shutdown: AtomicBool,
458 }
459 
460 impl Drop for ServerCore {
drop(&mut self)461     fn drop(&mut self) {
462         unsafe { grpc_sys::grpc_server_destroy(self.server) }
463     }
464 }
465 
466 unsafe impl Send for ServerCore {}
467 unsafe impl Sync for ServerCore {}
468 
469 pub type BoxHandler = Box<dyn CloneableHandler>;
470 
471 #[derive(Clone)]
472 pub struct RequestCallContext {
473     server: Arc<ServerCore>,
474     registry: Arc<UnsafeCell<HashMap<&'static [u8], BoxHandler>>>,
475     checkers: Vec<Box<dyn ServerChecker>>,
476 }
477 
478 impl RequestCallContext {
479     /// Users should guarantee the method is always called from the same thread.
480     /// TODO: Is there a better way?
481     #[inline]
get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler>482     pub unsafe fn get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler> {
483         let registry = &mut *self.registry.get();
484         registry.get_mut(path)
485     }
486 
get_checker(&self) -> Vec<Box<dyn ServerChecker>>487     pub(crate) fn get_checker(&self) -> Vec<Box<dyn ServerChecker>> {
488         self.checkers.clone()
489     }
490 }
491 
492 // Apparently, its life time is guaranteed by the ref count, hence is safe to be sent
493 // to other thread. However it's not `Sync`, as `BoxHandler` is unnecessarily `Sync`.
494 unsafe impl Send for RequestCallContext {}
495 
496 /// Request notification of a new call.
request_call(ctx: RequestCallContext, cq: &CompletionQueue)497 pub fn request_call(ctx: RequestCallContext, cq: &CompletionQueue) {
498     if ctx.server.shutdown.load(Ordering::Relaxed) {
499         return;
500     }
501     let cq_ref = match cq.borrow() {
502         // Shutting down, skip.
503         Err(_) => return,
504         Ok(c) => c,
505     };
506     let server_ptr = ctx.server.server;
507     let prom = CallTag::request(ctx);
508     let request_ptr = prom.request_ctx().unwrap().as_ptr();
509     let prom_box = Box::new(prom);
510     let tag = Box::into_raw(prom_box);
511     let code = unsafe {
512         grpc_sys::grpcwrap_server_request_call(
513             server_ptr,
514             cq_ref.as_ptr(),
515             request_ptr,
516             tag as *mut _,
517         )
518     };
519     if code != grpc_call_error::GRPC_CALL_OK {
520         Box::from(tag);
521         panic!("failed to request call: {:?}", code);
522     }
523 }
524 
525 /// A `Future` that will resolve when shutdown completes.
526 pub struct ShutdownFuture {
527     /// `true` means the future finishes successfully.
528     cq_f: CqFuture<bool>,
529 }
530 
531 impl Future for ShutdownFuture {
532     type Output = Result<()>;
533 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>534     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
535         match ready!(Pin::new(&mut self.cq_f).poll(cx)) {
536             Ok(true) => Poll::Ready(Ok(())),
537             Ok(false) => Poll::Ready(Err(Error::ShutdownFailed)),
538             Err(e) => unreachable!("action future should never resolve to error: {}", e),
539         }
540     }
541 }
542 
543 /// A gRPC server.
544 ///
545 /// A single server can serve arbitrary number of services and can listen on more than one port.
546 ///
547 /// Use [`ServerBuilder`] to build a [`Server`].
548 pub struct Server {
549     env: Arc<Environment>,
550     core: Arc<ServerCore>,
551     handlers: HashMap<&'static [u8], BoxHandler>,
552     checkers: Vec<Box<dyn ServerChecker>>,
553 }
554 
555 impl Server {
556     /// Shutdown the server asynchronously.
shutdown(&mut self) -> ShutdownFuture557     pub fn shutdown(&mut self) -> ShutdownFuture {
558         let (cq_f, prom) = CallTag::action_pair();
559         let prom_box = Box::new(prom);
560         let tag = Box::into_raw(prom_box);
561         unsafe {
562             // Since env still exists, no way can cq been shutdown.
563             let cq_ref = self.env.completion_queues()[0].borrow().unwrap();
564             grpc_sys::grpc_server_shutdown_and_notify(
565                 self.core.server,
566                 cq_ref.as_ptr(),
567                 tag as *mut _,
568             )
569         }
570         self.core.shutdown.store(true, Ordering::SeqCst);
571         ShutdownFuture { cq_f }
572     }
573 
574     /// Cancel all in-progress calls.
575     ///
576     /// Only usable after shutdown.
cancel_all_calls(&mut self)577     pub fn cancel_all_calls(&mut self) {
578         unsafe { grpc_sys::grpc_server_cancel_all_calls(self.core.server) }
579     }
580 
581     /// Start the server.
start(&mut self)582     pub fn start(&mut self) {
583         unsafe {
584             grpc_sys::grpc_server_start(self.core.server);
585             for cq in self.env.completion_queues() {
586                 // Handlers are Send and Clone, but not Sync. So we need to
587                 // provide a replica for each completion queue.
588                 let registry = self
589                     .handlers
590                     .iter()
591                     .map(|(k, v)| (k.to_owned(), v.box_clone()))
592                     .collect();
593                 let rc = RequestCallContext {
594                     server: self.core.clone(),
595                     registry: Arc::new(UnsafeCell::new(registry)),
596                     checkers: self.checkers.clone(),
597                 };
598                 for _ in 0..self.core.slots_per_cq {
599                     request_call(rc.clone(), cq);
600                 }
601             }
602         }
603     }
604 
605     /// Get binded addresses pairs.
bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)>606     pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> {
607         self.core.binders.iter().map(|b| (&b.host, b.port))
608     }
609 
610     /// Add an rpc channel for an established connection represented as a file
611     /// descriptor. Takes ownership of the file descriptor, closing it when
612     /// channel is closed.
613     ///
614     /// # Safety
615     ///
616     /// The file descriptor must correspond to a connected stream socket. After
617     /// this call, the socket must not be accessed (read / written / closed)
618     /// by other code.
619     #[cfg(unix)]
add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int)620     pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) {
621         grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd)
622     }
623 }
624 
625 impl Drop for Server {
drop(&mut self)626     fn drop(&mut self) {
627         // if the server is not shutdown completely, destroy a server will core.
628         // TODO: don't wait here
629         let f = if !self.core.shutdown.load(Ordering::SeqCst) {
630             Some(self.shutdown())
631         } else {
632             None
633         };
634         self.cancel_all_calls();
635         let _ = f.map(futures::executor::block_on);
636     }
637 }
638 
639 impl Debug for Server {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result640     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
641         write!(f, "Server {:?}", self.core.binders)
642     }
643 }
644 
645 #[cfg(test)]
646 mod tests {
647     use super::join_host_port;
648 
649     #[test]
test_join_host_port()650     fn test_join_host_port() {
651         let tbl = vec![
652             ("localhost", 0u16, "localhost:0\0"),
653             ("127.0.0.1", 100u16, "127.0.0.1:100\0"),
654             ("::1", 0u16, "[::1]:0\0"),
655             (
656                 "fe80::7376:45d5:fb08:61e3",
657                 10028u16,
658                 "[fe80::7376:45d5:fb08:61e3]:10028\0",
659             ),
660         ];
661 
662         for (h, p, e) in &tbl {
663             assert_eq!(join_host_port(h, *p), e.to_owned());
664         }
665     }
666 }
667