1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! `URingExecutor`
6 //!
7 //! The executor runs all given futures to completion. Futures register wakers associated with
8 //! io_uring operations. A waker is called when the set of uring ops the waker is waiting on
9 //! completes.
10 //!
11 //! `URingExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
12 //! utility functions to combine futures. In general, the interface provided by `URingExecutor`
13 //! shouldn't be used directly. Instead, use them by interacting with implementors of `IoSource`,
14 //! and the high-level future functions.
15 //!
16 //!
17 //! ## Read/Write buffer management.
18 //!
19 //! There are two key issues managing asynchronous IO buffers in rust.
20 //! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
21 //! not have any references to it during that time.
22 //! 2) The memory must remain valid as long as the kernel has a reference to it.
23 //!
24 //! ### The kernel's mutable borrow of the buffer
25 //!
26 //! Because the buffers used for read and write must be passed to the kernel for an unknown
27 //! duration, the functions must maintain ownership of the memory.  The core of this problem is that
28 //! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
29 //! future has a reference to.  The buffer can be modified at any point from submission until the
30 //! operation completes. The operation can't be synchronously canceled when the future is dropped,
31 //! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
32 //! implements `BackingMemory` is accepted.  For implementors of `BackingMemory` the mut borrow
33 //! isn't an issue because those are already Ok with external modifications to the memory (Like a
34 //! `VolatileSlice`).
35 //!
36 //! ### Buffer lifetime
37 //!
38 //! What if the kernel's reference to the buffer outlives the buffer itself?  This could happen if a
39 //! read operation was submitted, then the memory is dropped.  To solve this, the executor takes an
40 //! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
41 //! the executor.  The executor holds the Arc and ensures all operations are complete before dropping
42 //! it, that guarantees the memory is valid for the duration.
43 //!
44 //! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
45 //! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
46 //! until the operation completes unless the executor holds an Arc to the memory on the heap.
47 //!
48 //! ## Using `Vec` for reads/writes.
49 //!
50 //! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
51 //! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
52 //! ensure it lives long enough.
53 
54 use std::convert::TryInto;
55 use std::fs::File;
56 use std::future::Future;
57 use std::io;
58 use std::mem;
59 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
60 use std::pin::Pin;
61 use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
62 use std::sync::{Arc, Weak};
63 use std::task::Waker;
64 use std::task::{Context, Poll};
65 use std::thread::{self, ThreadId};
66 
67 use async_task::Task;
68 use futures::task::noop_waker;
69 use io_uring::URingContext;
70 use pin_utils::pin_mut;
71 use slab::Slab;
72 use sync::Mutex;
73 use sys_util::{warn, WatchingEvents};
74 use thiserror::Error as ThisError;
75 
76 use crate::mem::{BackingMemory, MemRegion};
77 use crate::queue::RunnableQueue;
78 use crate::waker::{new_waker, WakerToken, WeakWake};
79 
80 #[derive(Debug, ThisError)]
81 pub enum Error {
82     /// Failed to copy the FD for the polling context.
83     #[error("Failed to copy the FD for the polling context: {0}")]
84     DuplicatingFd(sys_util::Error),
85     /// The Executor is gone.
86     #[error("The URingExecutor is gone")]
87     ExecutorGone,
88     /// Invalid offset or length given for an iovec in backing memory.
89     #[error("Invalid offset/len for getting an iovec")]
90     InvalidOffset,
91     /// Invalid FD source specified.
92     #[error("Invalid source, FD not registered for use")]
93     InvalidSource,
94     /// Error doing the IO.
95     #[error("Error during IO: {0}")]
96     Io(io::Error),
97     /// Creating a context to wait on FDs failed.
98     #[error("Error creating the fd waiting context: {0}")]
99     CreatingContext(io_uring::Error),
100     /// Failed to remove the waker remove the polling context.
101     #[error("Error removing from the URing context: {0}")]
102     RemovingWaker(io_uring::Error),
103     /// Failed to submit the operation to the polling context.
104     #[error("Error adding to the URing context: {0}")]
105     SubmittingOp(io_uring::Error),
106     /// URingContext failure.
107     #[error("URingContext failure: {0}")]
108     URingContextError(io_uring::Error),
109     /// Failed to submit or wait for io_uring events.
110     #[error("URing::enter: {0}")]
111     URingEnter(io_uring::Error),
112 }
113 pub type Result<T> = std::result::Result<T, Error>;
114 
115 // Checks if the uring executor is available.
116 // Caches the result so that the check is only run once.
117 // Useful for falling back to the FD executor on pre-uring kernels.
use_uring() -> bool118 pub(crate) fn use_uring() -> bool {
119     const UNKNOWN: u32 = 0;
120     const URING: u32 = 1;
121     const FD: u32 = 2;
122     static USE_URING: AtomicU32 = AtomicU32::new(UNKNOWN);
123     match USE_URING.load(Ordering::Relaxed) {
124         UNKNOWN => {
125             // Create a dummy uring context to check that the kernel understands the syscalls.
126             if URingContext::new(8).is_ok() {
127                 USE_URING.store(URING, Ordering::Relaxed);
128                 true
129             } else {
130                 USE_URING.store(FD, Ordering::Relaxed);
131                 false
132             }
133         }
134         URING => true,
135         FD => false,
136         _ => unreachable!("invalid use uring state"),
137     }
138 }
139 
140 pub struct RegisteredSource {
141     tag: usize,
142     ex: Weak<RawExecutor>,
143 }
144 
145 impl RegisteredSource {
start_read_to_mem( &self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>146     pub fn start_read_to_mem(
147         &self,
148         file_offset: u64,
149         mem: Arc<dyn BackingMemory + Send + Sync>,
150         addrs: &[MemRegion],
151     ) -> Result<PendingOperation> {
152         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
153         let token = ex.submit_read_to_vectored(self, mem, file_offset, addrs)?;
154 
155         Ok(PendingOperation {
156             waker_token: Some(token),
157             ex: self.ex.clone(),
158             submitted: false,
159         })
160     }
161 
start_write_from_mem( &self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>162     pub fn start_write_from_mem(
163         &self,
164         file_offset: u64,
165         mem: Arc<dyn BackingMemory + Send + Sync>,
166         addrs: &[MemRegion],
167     ) -> Result<PendingOperation> {
168         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
169         let token = ex.submit_write_from_vectored(self, mem, file_offset, addrs)?;
170 
171         Ok(PendingOperation {
172             waker_token: Some(token),
173             ex: self.ex.clone(),
174             submitted: false,
175         })
176     }
177 
start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation>178     pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
179         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
180         let token = ex.submit_fallocate(self, offset, len, mode)?;
181 
182         Ok(PendingOperation {
183             waker_token: Some(token),
184             ex: self.ex.clone(),
185             submitted: false,
186         })
187     }
188 
start_fsync(&self) -> Result<PendingOperation>189     pub fn start_fsync(&self) -> Result<PendingOperation> {
190         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
191         let token = ex.submit_fsync(self)?;
192 
193         Ok(PendingOperation {
194             waker_token: Some(token),
195             ex: self.ex.clone(),
196             submitted: false,
197         })
198     }
199 
poll_fd_readable(&self) -> Result<PendingOperation>200     pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
201         let events = WatchingEvents::empty().set_read();
202 
203         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
204         let token = ex.submit_poll(self, &events)?;
205 
206         Ok(PendingOperation {
207             waker_token: Some(token),
208             ex: self.ex.clone(),
209             submitted: false,
210         })
211     }
212 }
213 
214 impl Drop for RegisteredSource {
drop(&mut self)215     fn drop(&mut self) {
216         if let Some(ex) = self.ex.upgrade() {
217             let _ = ex.deregister_source(self);
218         }
219     }
220 }
221 
222 // Indicates that the executor is either within or about to make an io_uring_enter syscall. When a
223 // waker sees this value, it will add and submit a NOP to the uring, which will wake up the thread
224 // blocked on the io_uring_enter syscall.
225 const WAITING: i32 = 0xb80d_21b5u32 as i32;
226 
227 // Indicates that the executor is processing any futures that are ready to run.
228 const PROCESSING: i32 = 0xdb31_83a3u32 as i32;
229 
230 // Indicates that one or more futures may be ready to make progress.
231 const WOKEN: i32 = 0x0fc7_8f7eu32 as i32;
232 
233 // Number of entries in the ring.
234 const NUM_ENTRIES: usize = 256;
235 
236 // An operation that has been submitted to the uring and is potentially being waited on.
237 struct OpData {
238     _file: Arc<File>,
239     _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
240     waker: Option<Waker>,
241     canceled: bool,
242 }
243 
244 // The current status of an operation that's been submitted to the uring.
245 enum OpStatus {
246     Nop,
247     Pending(OpData),
248     Completed(Option<::std::io::Result<u32>>),
249 }
250 
251 struct Ring {
252     ops: Slab<OpStatus>,
253     registered_sources: Slab<Arc<File>>,
254 }
255 
256 struct RawExecutor {
257     // The URingContext needs to be first so that it is dropped first, closing the uring fd, and
258     // releasing the resources borrowed by the kernel before we free them.
259     ctx: URingContext,
260     queue: RunnableQueue,
261     ring: Mutex<Ring>,
262     thread_id: Mutex<Option<ThreadId>>,
263     state: AtomicI32,
264 }
265 
266 impl RawExecutor {
new() -> Result<RawExecutor>267     fn new() -> Result<RawExecutor> {
268         Ok(RawExecutor {
269             ctx: URingContext::new(NUM_ENTRIES).map_err(Error::CreatingContext)?,
270             queue: RunnableQueue::new(),
271             ring: Mutex::new(Ring {
272                 ops: Slab::with_capacity(NUM_ENTRIES),
273                 registered_sources: Slab::with_capacity(NUM_ENTRIES),
274             }),
275             thread_id: Mutex::new(None),
276             state: AtomicI32::new(PROCESSING),
277         })
278     }
279 
wake(&self)280     fn wake(&self) {
281         let oldstate = self.state.swap(WOKEN, Ordering::Release);
282         if oldstate == WAITING {
283             let mut ring = self.ring.lock();
284             let entry = ring.ops.vacant_entry();
285             let next_op_token = entry.key();
286             if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
287                 warn!("Failed to add NOP for waking up executor: {}", e);
288             }
289             entry.insert(OpStatus::Nop);
290             mem::drop(ring);
291 
292             match self.ctx.submit() {
293                 Ok(()) => {}
294                 // If the kernel's submit ring is full then we know we won't block when calling
295                 // io_uring_enter, which is all we really care about.
296                 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
297                 Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
298             }
299         }
300     }
301 
spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,302     fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
303     where
304         F: Future + Send + 'static,
305         F::Output: Send + 'static,
306     {
307         let raw = Arc::downgrade(self);
308         let schedule = move |runnable| {
309             if let Some(r) = raw.upgrade() {
310                 r.queue.push_back(runnable);
311                 r.wake();
312             }
313         };
314         let (runnable, task) = async_task::spawn(f, schedule);
315         runnable.schedule();
316         task
317     }
318 
spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,319     fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
320     where
321         F: Future + 'static,
322         F::Output: 'static,
323     {
324         let raw = Arc::downgrade(self);
325         let schedule = move |runnable| {
326             if let Some(r) = raw.upgrade() {
327                 r.queue.push_back(runnable);
328                 r.wake();
329             }
330         };
331         let (runnable, task) = async_task::spawn_local(f, schedule);
332         runnable.schedule();
333         task
334     }
335 
runs_tasks_on_current_thread(&self) -> bool336     fn runs_tasks_on_current_thread(&self) -> bool {
337         let executor_thread = self.thread_id.lock();
338         executor_thread
339             .map(|id| id == thread::current().id())
340             .unwrap_or(false)
341     }
342 
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>343     fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
344         let current_thread = thread::current().id();
345         let mut thread_id = self.thread_id.lock();
346         assert_eq!(
347             *thread_id.get_or_insert(current_thread),
348             current_thread,
349             "`URingExecutor::run` cannot be called from more than one thread"
350         );
351         mem::drop(thread_id);
352 
353         pin_mut!(done);
354         loop {
355             self.state.store(PROCESSING, Ordering::Release);
356             for runnable in self.queue.iter() {
357                 runnable.run();
358             }
359 
360             if let Poll::Ready(val) = done.as_mut().poll(cx) {
361                 return Ok(val);
362             }
363 
364             let oldstate = self.state.compare_exchange(
365                 PROCESSING,
366                 WAITING,
367                 Ordering::Acquire,
368                 Ordering::Acquire,
369             );
370             if let Err(oldstate) = oldstate {
371                 debug_assert_eq!(oldstate, WOKEN);
372                 // One or more futures have become runnable.
373                 continue;
374             }
375 
376             let events = self.ctx.wait().map_err(Error::URingEnter)?;
377 
378             // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
379             // writing to the eventfd.
380             self.state.store(PROCESSING, Ordering::Release);
381 
382             let mut ring = self.ring.lock();
383             for (raw_token, result) in events {
384                 // While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
385                 // something that we originally gave to the kernel and that was created from a
386                 // `usize` so we should always be able to convert it back into a `usize`.
387                 let token = raw_token
388                     .try_into()
389                     .expect("`u64` doesn't fit inside a `usize`");
390 
391                 let op = ring
392                     .ops
393                     .get_mut(token)
394                     .expect("Received completion token for unexpected operation");
395                 match mem::replace(op, OpStatus::Completed(Some(result))) {
396                     // No one is waiting on a Nop.
397                     OpStatus::Nop => mem::drop(ring.ops.remove(token)),
398                     OpStatus::Pending(data) => {
399                         if data.canceled {
400                             // No one is waiting for this operation and the uring is done with
401                             // it so it's safe to remove.
402                             ring.ops.remove(token);
403                         }
404                         if let Some(waker) = data.waker {
405                             waker.wake();
406                         }
407                     }
408                     OpStatus::Completed(_) => panic!("uring operation completed more than once"),
409                 }
410             }
411         }
412     }
413 
get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>>414     fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
415         let mut ring = self.ring.lock();
416 
417         let op = ring
418             .ops
419             .get_mut(token.0)
420             .expect("`get_result` called on unknown operation");
421         match op {
422             OpStatus::Nop => panic!("`get_result` called on nop"),
423             OpStatus::Pending(data) => {
424                 if data.canceled {
425                     panic!("`get_result` called on canceled operation");
426                 }
427                 data.waker = Some(cx.waker().clone());
428                 None
429             }
430             OpStatus::Completed(res) => {
431                 let out = res.take();
432                 ring.ops.remove(token.0);
433                 Some(out.expect("Missing result in completed operation"))
434             }
435         }
436     }
437 
438     // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken)439     fn cancel_operation(&self, token: WakerToken) {
440         let mut ring = self.ring.lock();
441         if let Some(op) = ring.ops.get_mut(token.0) {
442             match op {
443                 OpStatus::Nop => panic!("`cancel_operation` called on nop"),
444                 OpStatus::Pending(data) => {
445                     if data.canceled {
446                         panic!("uring operation canceled more than once");
447                     }
448 
449                     // Clear the waker as it is no longer needed.
450                     data.waker = None;
451                     data.canceled = true;
452 
453                     // Keep the rest of the op data as the uring might still be accessing either
454                     // the source of the backing memory so it needs to live until the kernel
455                     // completes the operation.  TODO: cancel the operation in the uring.
456                 }
457                 OpStatus::Completed(_) => {
458                     ring.ops.remove(token.0);
459                 }
460             }
461         }
462     }
463 
register_source(&self, f: Arc<File>) -> usize464     fn register_source(&self, f: Arc<File>) -> usize {
465         self.ring.lock().registered_sources.insert(f)
466     }
467 
deregister_source(&self, source: &RegisteredSource)468     fn deregister_source(&self, source: &RegisteredSource) {
469         // There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
470         // need.let them complete. deregister with pending ops is not a common path no need to
471         // optimize that case yet.
472         self.ring.lock().registered_sources.remove(source.tag);
473     }
474 
submit_poll( &self, source: &RegisteredSource, events: &sys_util::WatchingEvents, ) -> Result<WakerToken>475     fn submit_poll(
476         &self,
477         source: &RegisteredSource,
478         events: &sys_util::WatchingEvents,
479     ) -> Result<WakerToken> {
480         let mut ring = self.ring.lock();
481         let src = ring
482             .registered_sources
483             .get(source.tag)
484             .map(Arc::clone)
485             .ok_or(Error::InvalidSource)?;
486         let entry = ring.ops.vacant_entry();
487         let next_op_token = entry.key();
488         self.ctx
489             .add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
490             .map_err(Error::SubmittingOp)?;
491 
492         entry.insert(OpStatus::Pending(OpData {
493             _file: src,
494             _mem: None,
495             waker: None,
496             canceled: false,
497         }));
498 
499         Ok(WakerToken(next_op_token))
500     }
501 
submit_fallocate( &self, source: &RegisteredSource, offset: u64, len: u64, mode: u32, ) -> Result<WakerToken>502     fn submit_fallocate(
503         &self,
504         source: &RegisteredSource,
505         offset: u64,
506         len: u64,
507         mode: u32,
508     ) -> Result<WakerToken> {
509         let mut ring = self.ring.lock();
510         let src = ring
511             .registered_sources
512             .get(source.tag)
513             .map(Arc::clone)
514             .ok_or(Error::InvalidSource)?;
515         let entry = ring.ops.vacant_entry();
516         let next_op_token = entry.key();
517         self.ctx
518             .add_fallocate(
519                 src.as_raw_fd(),
520                 offset,
521                 len,
522                 mode,
523                 usize_to_u64(next_op_token),
524             )
525             .map_err(Error::SubmittingOp)?;
526 
527         entry.insert(OpStatus::Pending(OpData {
528             _file: src,
529             _mem: None,
530             waker: None,
531             canceled: false,
532         }));
533 
534         Ok(WakerToken(next_op_token))
535     }
536 
submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken>537     fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
538         let mut ring = self.ring.lock();
539         let src = ring
540             .registered_sources
541             .get(source.tag)
542             .map(Arc::clone)
543             .ok_or(Error::InvalidSource)?;
544         let entry = ring.ops.vacant_entry();
545         let next_op_token = entry.key();
546         self.ctx
547             .add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
548             .map_err(Error::SubmittingOp)?;
549 
550         entry.insert(OpStatus::Pending(OpData {
551             _file: src,
552             _mem: None,
553             waker: None,
554             canceled: false,
555         }));
556 
557         Ok(WakerToken(next_op_token))
558     }
559 
submit_read_to_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: u64, addrs: &[MemRegion], ) -> Result<WakerToken>560     fn submit_read_to_vectored(
561         &self,
562         source: &RegisteredSource,
563         mem: Arc<dyn BackingMemory + Send + Sync>,
564         offset: u64,
565         addrs: &[MemRegion],
566     ) -> Result<WakerToken> {
567         if addrs
568             .iter()
569             .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
570         {
571             return Err(Error::InvalidOffset);
572         }
573 
574         let mut ring = self.ring.lock();
575         let src = ring
576             .registered_sources
577             .get(source.tag)
578             .map(Arc::clone)
579             .ok_or(Error::InvalidSource)?;
580 
581         // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
582         let entry = ring.ops.vacant_entry();
583         let next_op_token = entry.key();
584 
585         // The addresses have already been validated, so unwrapping them will succeed.
586         // validate their addresses before submitting.
587         let iovecs = addrs
588             .iter()
589             .map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
590 
591         unsafe {
592             // Safe because all the addresses are within the Memory that an Arc is kept for the
593             // duration to ensure the memory is valid while the kernel accesses it.
594             // Tested by `dont_drop_backing_mem_read` unit test.
595             self.ctx
596                 .add_readv_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
597                 .map_err(Error::SubmittingOp)?;
598         }
599 
600         entry.insert(OpStatus::Pending(OpData {
601             _file: src,
602             _mem: Some(mem),
603             waker: None,
604             canceled: false,
605         }));
606 
607         Ok(WakerToken(next_op_token))
608     }
609 
submit_write_from_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: u64, addrs: &[MemRegion], ) -> Result<WakerToken>610     fn submit_write_from_vectored(
611         &self,
612         source: &RegisteredSource,
613         mem: Arc<dyn BackingMemory + Send + Sync>,
614         offset: u64,
615         addrs: &[MemRegion],
616     ) -> Result<WakerToken> {
617         if addrs
618             .iter()
619             .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
620         {
621             return Err(Error::InvalidOffset);
622         }
623 
624         let mut ring = self.ring.lock();
625         let src = ring
626             .registered_sources
627             .get(source.tag)
628             .map(Arc::clone)
629             .ok_or(Error::InvalidSource)?;
630 
631         // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
632         let entry = ring.ops.vacant_entry();
633         let next_op_token = entry.key();
634 
635         // The addresses have already been validated, so unwrapping them will succeed.
636         // validate their addresses before submitting.
637         let iovecs = addrs
638             .iter()
639             .map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
640 
641         unsafe {
642             // Safe because all the addresses are within the Memory that an Arc is kept for the
643             // duration to ensure the memory is valid while the kernel accesses it.
644             // Tested by `dont_drop_backing_mem_write` unit test.
645             self.ctx
646                 .add_writev_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
647                 .map_err(Error::SubmittingOp)?;
648         }
649 
650         entry.insert(OpStatus::Pending(OpData {
651             _file: src,
652             _mem: Some(mem),
653             waker: None,
654             canceled: false,
655         }));
656 
657         Ok(WakerToken(next_op_token))
658     }
659 }
660 
661 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)662     fn wake_by_ref(weak_self: &Weak<Self>) {
663         if let Some(arc_self) = weak_self.upgrade() {
664             RawExecutor::wake(&arc_self);
665         }
666     }
667 }
668 
669 impl Drop for RawExecutor {
drop(&mut self)670     fn drop(&mut self) {
671         // Wake up any futures still waiting on uring operations.
672         let ring = self.ring.get_mut();
673         for (_, op) in ring.ops.iter_mut() {
674             match op {
675                 OpStatus::Nop => {}
676                 OpStatus::Pending(data) => {
677                     // If the operation wasn't already canceled then wake up the future waiting on
678                     // it. When polled that future will get an ExecutorGone error anyway so there's
679                     // no point in waiting until the operation completes to wake it up.
680                     if !data.canceled {
681                         if let Some(waker) = data.waker.take() {
682                             waker.wake();
683                         }
684                     }
685 
686                     data.canceled = true;
687                 }
688                 OpStatus::Completed(_) => {}
689             }
690         }
691 
692         // Since the RawExecutor is wrapped in an Arc it may end up being dropped from a different
693         // thread than the one that called `run` or `run_until`. Since we know there are no other
694         // references, just clear the thread id so that we don't panic.
695         *self.thread_id.lock() = None;
696 
697         // Now run the executor loop once more to poll any futures we just woke up.
698         let waker = noop_waker();
699         let mut cx = Context::from_waker(&waker);
700         let res = self.run(&mut cx, async {});
701 
702         if let Err(e) = res {
703             warn!("Failed to drive uring to completion: {}", e);
704         }
705     }
706 }
707 
708 /// An executor that uses io_uring for its asynchronous I/O operations. See the documentation of
709 /// `Executor` for more details about the methods.
710 #[derive(Clone)]
711 pub struct URingExecutor {
712     raw: Arc<RawExecutor>,
713 }
714 
715 impl URingExecutor {
new() -> Result<URingExecutor>716     pub fn new() -> Result<URingExecutor> {
717         let raw = RawExecutor::new().map(Arc::new)?;
718 
719         Ok(URingExecutor { raw })
720     }
721 
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,722     pub fn spawn<F>(&self, f: F) -> Task<F::Output>
723     where
724         F: Future + Send + 'static,
725         F::Output: Send + 'static,
726     {
727         self.raw.spawn(f)
728     }
729 
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,730     pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
731     where
732         F: Future + 'static,
733         F::Output: 'static,
734     {
735         self.raw.spawn_local(f)
736     }
737 
run(&self) -> Result<()>738     pub fn run(&self) -> Result<()> {
739         let waker = new_waker(Arc::downgrade(&self.raw));
740         let mut cx = Context::from_waker(&waker);
741 
742         self.raw.run(&mut cx, crate::empty::<()>())
743     }
744 
run_until<F: Future>(&self, f: F) -> Result<F::Output>745     pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
746         let waker = new_waker(Arc::downgrade(&self.raw));
747         let mut ctx = Context::from_waker(&waker);
748         self.raw.run(&mut ctx, f)
749     }
750 
751     /// Register a file and memory pair for buffered asynchronous operation.
register_source<F: AsRawFd>(&self, fd: &F) -> Result<RegisteredSource>752     pub(crate) fn register_source<F: AsRawFd>(&self, fd: &F) -> Result<RegisteredSource> {
753         let duped_fd = unsafe {
754             // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
755             // will only be added to the poll loop.
756             File::from_raw_fd(dup_fd(fd.as_raw_fd())?)
757         };
758 
759         Ok(RegisteredSource {
760             tag: self.raw.register_source(Arc::new(duped_fd)),
761             ex: Arc::downgrade(&self.raw),
762         })
763     }
764 }
765 
766 // Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
767 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>768 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
769     let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
770     if ret < 0 {
771         Err(Error::DuplicatingFd(sys_util::Error::last()))
772     } else {
773         Ok(ret)
774     }
775 }
776 
777 // Converts a `usize` into a `u64` and panics if the conversion fails.
778 #[inline]
usize_to_u64(val: usize) -> u64779 fn usize_to_u64(val: usize) -> u64 {
780     val.try_into().expect("`usize` doesn't fit inside a `u64`")
781 }
782 
783 pub struct PendingOperation {
784     waker_token: Option<WakerToken>,
785     ex: Weak<RawExecutor>,
786     submitted: bool,
787 }
788 
789 impl Future for PendingOperation {
790     type Output = Result<u32>;
791 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>792     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
793         let token = self
794             .waker_token
795             .as_ref()
796             .expect("PendingOperation polled after returning Poll::Ready");
797         if let Some(ex) = self.ex.upgrade() {
798             if let Some(result) = ex.get_result(token, cx) {
799                 self.waker_token = None;
800                 Poll::Ready(result.map_err(Error::Io))
801             } else {
802                 // If we haven't submitted the operation yet, and the executor runs on a different
803                 // thread then submit it now. Otherwise the executor will submit it automatically
804                 // the next time it calls UringContext::wait.
805                 if !self.submitted && !ex.runs_tasks_on_current_thread() {
806                     match ex.ctx.submit() {
807                         Ok(()) => self.submitted = true,
808                         // If the kernel ring is full then wait until some ops are removed from the
809                         // completion queue. This op should get submitted the next time the executor
810                         // calls UringContext::wait.
811                         Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
812                         Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
813                     }
814                 }
815                 Poll::Pending
816             }
817         } else {
818             Poll::Ready(Err(Error::ExecutorGone))
819         }
820     }
821 }
822 
823 impl Drop for PendingOperation {
drop(&mut self)824     fn drop(&mut self) {
825         if let Some(waker_token) = self.waker_token.take() {
826             if let Some(ex) = self.ex.upgrade() {
827                 ex.cancel_operation(waker_token);
828             }
829         }
830     }
831 }
832 
833 #[cfg(test)]
834 mod tests {
835     use std::future::Future;
836     use std::io::{Read, Write};
837     use std::mem;
838     use std::pin::Pin;
839     use std::task::{Context, Poll};
840 
841     use futures::executor::block_on;
842 
843     use super::*;
844     use crate::mem::{BackingMemory, MemRegion, VecIoWrapper};
845 
846     // A future that returns ready when the uring queue is empty.
847     struct UringQueueEmpty<'a> {
848         ex: &'a URingExecutor,
849     }
850 
851     impl<'a> Future for UringQueueEmpty<'a> {
852         type Output = ();
853 
poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output>854         fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
855             if self.ex.raw.ring.lock().ops.is_empty() {
856                 Poll::Ready(())
857             } else {
858                 Poll::Pending
859             }
860         }
861     }
862 
863     #[test]
dont_drop_backing_mem_read()864     fn dont_drop_backing_mem_read() {
865         // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
866         // op is pending.
867         let bm =
868             Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
869 
870         // Use pipes to create a future that will block forever.
871         let (rx, mut tx) = sys_util::pipe(true).unwrap();
872 
873         // Set up the TLS for the uring_executor by creating one.
874         let ex = URingExecutor::new().unwrap();
875 
876         // Register the receive side of the pipe with the executor.
877         let registered_source = ex.register_source(&rx).expect("register source failed");
878 
879         // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
880         // of the op.
881         let pending_op = registered_source
882             .start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
883             .expect("failed to start read to mem");
884 
885         // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
886         // reference while the op is active.
887         assert_eq!(Arc::strong_count(&bm), 2);
888 
889         // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
890         // it.
891         drop(pending_op);
892         assert_eq!(Arc::strong_count(&bm), 2);
893 
894         // Finishing the operation should put the Arc count back to 1.
895         // write to the pipe to wake the read pipe and then wait for the uring result in the
896         // executor.
897         tx.write_all(&[0u8; 8]).expect("write failed");
898         ex.run_until(UringQueueEmpty { ex: &ex })
899             .expect("Failed to wait for read pipe ready");
900         assert_eq!(Arc::strong_count(&bm), 1);
901     }
902 
903     #[test]
dont_drop_backing_mem_write()904     fn dont_drop_backing_mem_write() {
905         // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
906         // op is pending.
907         let bm =
908             Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
909 
910         // Use pipes to create a future that will block forever.
911         let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed");
912 
913         // Set up the TLS for the uring_executor by creating one.
914         let ex = URingExecutor::new().unwrap();
915 
916         // Register the receive side of the pipe with the executor.
917         let registered_source = ex.register_source(&tx).expect("register source failed");
918 
919         // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
920         // of the op.
921         let pending_op = registered_source
922             .start_write_from_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
923             .expect("failed to start write to mem");
924 
925         // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
926         // reference while the op is active.
927         assert_eq!(Arc::strong_count(&bm), 2);
928 
929         // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
930         // it.
931         drop(pending_op);
932         assert_eq!(Arc::strong_count(&bm), 2);
933 
934         // Finishing the operation should put the Arc count back to 1.
935         // write to the pipe to wake the read pipe and then wait for the uring result in the
936         // executor.
937         let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)];
938         rx.read_exact(&mut buf).expect("read to empty failed");
939         ex.run_until(UringQueueEmpty { ex: &ex })
940             .expect("Failed to wait for write pipe ready");
941         assert_eq!(Arc::strong_count(&bm), 1);
942     }
943 
944     #[test]
canceled_before_completion()945     fn canceled_before_completion() {
946         async fn cancel_io(op: PendingOperation) {
947             mem::drop(op);
948         }
949 
950         async fn check_result(op: PendingOperation, expected: u32) {
951             let actual = op.await.expect("operation failed to complete");
952             assert_eq!(expected, actual);
953         }
954 
955         let bm =
956             Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
957 
958         let (rx, tx) = sys_util::pipe(true).expect("Pipe failed");
959 
960         let ex = URingExecutor::new().unwrap();
961 
962         let rx_source = ex.register_source(&rx).expect("register source failed");
963         let tx_source = ex.register_source(&tx).expect("register source failed");
964 
965         let read_task = rx_source
966             .start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
967             .expect("failed to start read to mem");
968 
969         ex.spawn_local(cancel_io(read_task)).detach();
970 
971         // Write to the pipe so that the kernel operation will complete.
972         let buf =
973             Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
974         let write_task = tx_source
975             .start_write_from_mem(0, Arc::clone(&buf), &[MemRegion { offset: 0, len: 8 }])
976             .expect("failed to start write from mem");
977 
978         ex.run_until(check_result(write_task, 8))
979             .expect("Failed to run executor");
980     }
981 
982     #[test]
drop_before_completion()983     fn drop_before_completion() {
984         const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
985 
986         async fn check_op(op: PendingOperation) {
987             let err = op.await.expect_err("Op completed successfully");
988             match err {
989                 Error::ExecutorGone => {}
990                 e => panic!("Unexpected error from op: {}", e),
991             }
992         }
993 
994         let (mut rx, mut tx) = sys_util::pipe(true).expect("Pipe failed");
995 
996         let ex = URingExecutor::new().unwrap();
997 
998         let tx_source = ex.register_source(&tx).expect("Failed to register source");
999         let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1000         let op = tx_source
1001             .start_write_from_mem(
1002                 0,
1003                 bm,
1004                 &[MemRegion {
1005                     offset: 0,
1006                     len: mem::size_of::<u64>(),
1007                 }],
1008             )
1009             .expect("Failed to start write from mem");
1010 
1011         ex.spawn_local(check_op(op)).detach();
1012 
1013         // Now drop the executor. It shouldn't run the write operation.
1014         mem::drop(ex);
1015 
1016         // Make sure the executor did not complete the uring operation.
1017         let new_val = [0x2e; 8];
1018         tx.write_all(&new_val).unwrap();
1019 
1020         let mut buf = 0u64.to_ne_bytes();
1021         rx.read_exact(&mut buf[..])
1022             .expect("Failed to read from pipe");
1023 
1024         assert_eq!(buf, new_val);
1025     }
1026 
1027     #[test]
drop_on_different_thread()1028     fn drop_on_different_thread() {
1029         let ex = URingExecutor::new().unwrap();
1030 
1031         let ex2 = ex.clone();
1032         let t = thread::spawn(move || ex2.run_until(async {}));
1033 
1034         t.join().unwrap().unwrap();
1035 
1036         // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1037         // completion.
1038         let (_rx, tx) = sys_util::pipe(true).expect("Pipe failed");
1039         let tx = ex.register_source(&tx).expect("Failed to register source");
1040         let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1041         let op = tx
1042             .start_write_from_mem(
1043                 0,
1044                 bm,
1045                 &[MemRegion {
1046                     offset: 0,
1047                     len: mem::size_of::<u64>(),
1048                 }],
1049             )
1050             .expect("Failed to start write from mem");
1051 
1052         mem::drop(ex);
1053 
1054         match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1055             Error::ExecutorGone => {}
1056             e => panic!("Unexpected error after dropping executor: {}", e),
1057         }
1058     }
1059 }
1060