1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 //! gRPC C Core binds a call to a completion queue, all the related readiness
4 //! will be forwarded to the completion queue. This module utilizes the mechanism
5 //! and using `Kicker` to wake up completion queue.
6 //!
7 //! Apparently, to minimize context switch, it's better to bind the future to the
8 //! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
9 
10 use std::cell::UnsafeCell;
11 use std::pin::Pin;
12 use std::sync::atomic::{AtomicU8, Ordering};
13 use std::sync::Arc;
14 
15 use futures::future::Future;
16 use futures::task::{waker_ref, ArcWake, Context, Poll};
17 
18 use super::CallTag;
19 use crate::call::Call;
20 use crate::cq::{CompletionQueue, WorkQueue};
21 use crate::error::{Error, Result};
22 use crate::grpc_sys::{self, grpc_call_error};
23 
24 /// A handle to a `Spawn`.
25 /// Inner future is expected to be polled in the same thread as cq.
26 type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
27 
28 /// `Kicker` wakes up the completion queue that the inner call binds to.
29 pub(crate) struct Kicker {
30     call: Call,
31 }
32 
33 impl Kicker {
from_call(call: Call) -> Kicker34     pub fn from_call(call: Call) -> Kicker {
35         Kicker { call }
36     }
37 
38     /// Wakes up its completion queue.
39     ///
40     /// `tag` will be popped by `grpc_completion_queue_next` in the future.
kick(&self, tag: Box<CallTag>) -> Result<()>41     pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
42         let _ref = self.call.cq.borrow()?;
43         unsafe {
44             let ptr = Box::into_raw(tag);
45             let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
46             if status == grpc_call_error::GRPC_CALL_OK {
47                 Ok(())
48             } else {
49                 Err(Error::CallFailure(status))
50             }
51         }
52     }
53 }
54 
55 unsafe impl Sync for Kicker {}
56 
57 impl Clone for Kicker {
clone(&self) -> Kicker58     fn clone(&self) -> Kicker {
59         // Bump call's reference count.
60         let call = unsafe {
61             grpc_sys::grpc_call_ref(self.call.call);
62             self.call.call
63         };
64         let cq = self.call.cq.clone();
65         Kicker {
66             call: Call { call, cq },
67         }
68     }
69 }
70 
71 /// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
72 /// it will be notified via task.wake(), and marked as NOTIFIED. When executor
73 /// begins to poll the future, it's marked as POLLING. When the executor finishes
74 /// polling, the future can either be ready or not ready. In the former case, it's
75 /// marked as COMPLETED. If it's latter, it's marked as IDLE again.
76 ///
77 /// Note it's possible the future is notified during polling, in which case, executor
78 /// should polling it when last polling is finished unless it returns ready.
79 const NOTIFIED: u8 = 1;
80 const IDLE: u8 = 2;
81 const POLLING: u8 = 3;
82 const COMPLETED: u8 = 4;
83 
84 /// Maintains the spawned future with state, so that it can be notified and polled efficiently.
85 pub struct SpawnTask {
86     handle: UnsafeCell<Option<SpawnHandle>>,
87     state: AtomicU8,
88     kicker: Kicker,
89     queue: Arc<WorkQueue>,
90 }
91 
92 /// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
93 ///
94 /// Sync is required by `ArcWake`.
95 unsafe impl Sync for SpawnTask {}
96 
97 impl SpawnTask {
new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask98     fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
99         SpawnTask {
100             handle: UnsafeCell::new(Some(s)),
101             state: AtomicU8::new(IDLE),
102             kicker,
103             queue,
104         }
105     }
106 
107     /// Marks the state of this task to NOTIFIED.
108     ///
109     /// Returns true means the task was IDLE, needs to be scheduled.
mark_notified(&self) -> bool110     fn mark_notified(&self) -> bool {
111         loop {
112             match self.state.compare_exchange_weak(
113                 IDLE,
114                 NOTIFIED,
115                 Ordering::AcqRel,
116                 Ordering::Acquire,
117             ) {
118                 Ok(_) => return true,
119                 Err(POLLING) => match self.state.compare_exchange_weak(
120                     POLLING,
121                     NOTIFIED,
122                     Ordering::AcqRel,
123                     Ordering::Acquire,
124                 ) {
125                     Err(IDLE) | Err(POLLING) => continue,
126                     // If it succeeds, then executor will poll the future again;
127                     // if it fails, then the future should be resolved. In both
128                     // cases, no need to notify the future, hence return false.
129                     _ => return false,
130                 },
131                 Err(IDLE) => continue,
132                 _ => return false,
133             }
134         }
135     }
136 }
137 
resolve(task: Arc<SpawnTask>, success: bool)138 pub fn resolve(task: Arc<SpawnTask>, success: bool) {
139     // it should always be canceled for now.
140     assert!(success);
141     poll(task, true);
142 }
143 
144 /// A custom Waker.
145 ///
146 /// It will push the inner future to work_queue if it's notified on the
147 /// same thread as inner cq.
148 impl ArcWake for SpawnTask {
wake_by_ref(task: &Arc<Self>)149     fn wake_by_ref(task: &Arc<Self>) {
150         if !task.mark_notified() {
151             return;
152         }
153 
154         // It can lead to deadlock if poll the future immediately. So we need to
155         // defer the work instead.
156         if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
157             match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
158                 // If the queue is shutdown, then the tag will be notified
159                 // eventually. So just skip here.
160                 Err(Error::QueueShutdown) => (),
161                 Err(e) => panic!("unexpected error when canceling call: {:?}", e),
162                 _ => (),
163             }
164         }
165     }
166 }
167 
168 /// Work that should be deferred to be handled.
169 ///
170 /// Sometimes a work can't be done immediately as it might lead
171 /// to resource conflict, deadlock for example. So they will be
172 /// pushed into a queue and handled when current work is done.
173 pub struct UnfinishedWork(Arc<SpawnTask>);
174 
175 impl UnfinishedWork {
finish(self)176     pub fn finish(self) {
177         resolve(self.0, true);
178     }
179 }
180 
181 /// Poll the future.
182 ///
183 /// `woken` indicates that if the cq is waken up by itself.
poll(task: Arc<SpawnTask>, woken: bool)184 fn poll(task: Arc<SpawnTask>, woken: bool) {
185     let mut init_state = if woken { NOTIFIED } else { IDLE };
186     // TODO: maybe we need to break the loop to avoid hunger.
187     loop {
188         match task
189             .state
190             .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
191         {
192             Ok(_) => {}
193             Err(COMPLETED) => return,
194             Err(s) => panic!("unexpected state {}", s),
195         }
196 
197         let waker = waker_ref(&task);
198         let mut cx = Context::from_waker(&waker);
199 
200         // L208 "lock"s state, hence it's safe to get a mutable reference.
201         match unsafe { &mut *task.handle.get() }
202             .as_mut()
203             .unwrap()
204             .as_mut()
205             .poll(&mut cx)
206         {
207             Poll::Ready(()) => {
208                 task.state.store(COMPLETED, Ordering::Release);
209                 unsafe { &mut *task.handle.get() }.take();
210             }
211             _ => {
212                 match task.state.compare_exchange(
213                     POLLING,
214                     IDLE,
215                     Ordering::AcqRel,
216                     Ordering::Acquire,
217                 ) {
218                     Ok(_) => return,
219                     Err(NOTIFIED) => {
220                         init_state = NOTIFIED;
221                     }
222                     Err(s) => panic!("unexpected state {}", s),
223                 }
224             }
225         }
226     }
227 }
228 
229 /// An executor that drives a future in the gRPC poll thread, which
230 /// can reduce thread context switching.
231 pub(crate) struct Executor<'a> {
232     cq: &'a CompletionQueue,
233 }
234 
235 impl<'a> Executor<'a> {
new(cq: &CompletionQueue) -> Executor<'_>236     pub fn new(cq: &CompletionQueue) -> Executor<'_> {
237         Executor { cq }
238     }
239 
cq(&self) -> &CompletionQueue240     pub fn cq(&self) -> &CompletionQueue {
241         self.cq
242     }
243 
244     /// Spawn the future into inner poll loop.
245     ///
246     /// If you want to trace the future, you may need to create a sender/receiver
247     /// pair by yourself.
spawn<F>(&self, f: F, kicker: Kicker) where F: Future<Output = ()> + Send + 'static,248     pub fn spawn<F>(&self, f: F, kicker: Kicker)
249     where
250         F: Future<Output = ()> + Send + 'static,
251     {
252         let s = Box::pin(f);
253         let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
254         poll(notify, false)
255     }
256 }
257