1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // This file makes several casts from u8 pointers into more-aligned pointer types.
6 // We assume that the kernel will give us suitably aligned memory.
7 #![allow(clippy::cast_ptr_alignment)]
8 
9 use std::collections::BTreeMap;
10 use std::fmt;
11 use std::fs::File;
12 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
13 use std::pin::Pin;
14 use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64, AtomicUsize, Ordering};
15 
16 use data_model::IoBufMut;
17 use sync::Mutex;
18 use sys_util::{MappedRegion, MemoryMapping, Protection, WatchingEvents};
19 
20 use crate::bindings::*;
21 use crate::syscalls::*;
22 
23 /// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
24 /// for callers to identify each request.
25 pub type UserData = u64;
26 
27 #[derive(Debug)]
28 pub enum Error {
29     /// The call to `io_uring_enter` failed with the given errno.
30     RingEnter(libc::c_int),
31     /// The call to `io_uring_setup` failed with the given errno.
32     Setup(libc::c_int),
33     /// Failed to map the completion ring.
34     MappingCompleteRing(sys_util::MmapError),
35     /// Failed to map the submit ring.
36     MappingSubmitRing(sys_util::MmapError),
37     /// Failed to map submit entries.
38     MappingSubmitEntries(sys_util::MmapError),
39     /// Too many ops are already queued.
40     NoSpace,
41 }
42 pub type Result<T> = std::result::Result<T, Error>;
43 
44 impl fmt::Display for Error {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result45     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46         use self::Error::*;
47 
48         match self {
49             RingEnter(e) => write!(f, "Failed to enter io uring {}", e),
50             Setup(e) => write!(f, "Failed to setup io uring {}", e),
51             MappingCompleteRing(e) => write!(f, "Failed to mmap completion ring {}", e),
52             MappingSubmitRing(e) => write!(f, "Failed to mmap submit ring {}", e),
53             MappingSubmitEntries(e) => write!(f, "Failed to mmap submit entries {}", e),
54             NoSpace => write!(
55                 f,
56                 "No space for more ring entries, try increasing the size passed to `new`",
57             ),
58         }
59     }
60 }
61 
62 /// Basic statistics about the operations that have been submitted to the uring.
63 #[derive(Default)]
64 pub struct URingStats {
65     total_enter_calls: AtomicU64, // Number of times the uring has been entered.
66     total_ops: AtomicU64,         // Total ops submitted to io_uring.
67     total_complete: AtomicU64,    // Total ops completed by io_uring.
68 }
69 
70 struct SubmitQueue {
71     submit_ring: SubmitQueueState,
72     submit_queue_entries: SubmitQueueEntries,
73     io_vecs: Pin<Box<[IoBufMut<'static>]>>,
74     submitting: usize, // The number of ops in the process of being submitted.
75     added: usize,      // The number of ops added since the last call to `io_uring_enter`.
76     num_sqes: usize,   // The total number of sqes allocated in shared memory.
77 }
78 
79 impl SubmitQueue {
80     // Call `f` with the next available sqe or return an error if none are available.
81     // After `f` returns, the sqe is appended to the kernel's queue.
prep_next_sqe<F>(&mut self, mut f: F) -> Result<()> where F: FnMut(&mut io_uring_sqe, &mut libc::iovec),82     fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
83     where
84         F: FnMut(&mut io_uring_sqe, &mut libc::iovec),
85     {
86         if self.added == self.num_sqes {
87             return Err(Error::NoSpace);
88         }
89 
90         // Find the next free submission entry in the submit ring and fill it with an iovec.
91         // The below raw pointer derefs are safe because the memory the pointers use lives as long
92         // as the mmap in self.
93         let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
94         let next_tail = tail.wrapping_add(1);
95         if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
96             return Err(Error::NoSpace);
97         }
98         // `tail` is the next sqe to use.
99         let index = (tail & self.submit_ring.ring_mask) as usize;
100         let sqe = self.submit_queue_entries.get_mut(index).unwrap();
101 
102         f(sqe, self.io_vecs[index].as_mut());
103 
104         // Tells the kernel to use the new index when processing the entry at that index.
105         self.submit_ring.set_array_entry(index, index as u32);
106         // Ensure the above writes to sqe are seen before the tail is updated.
107         // set_tail uses Release ordering when storing to the ring.
108         self.submit_ring.pointers.set_tail(next_tail);
109 
110         self.added += 1;
111 
112         Ok(())
113     }
114 
115     // Returns the number of entries that have been added to this SubmitQueue since the last time
116     // `prepare_submit` was called.
prepare_submit(&mut self) -> usize117     fn prepare_submit(&mut self) -> usize {
118         let out = self.added - self.submitting;
119         self.submitting = self.added;
120 
121         out
122     }
123 
124     // Indicates that we failed to submit `count` entries to the kernel and that they should be
125     // retried.
fail_submit(&mut self, count: usize)126     fn fail_submit(&mut self, count: usize) {
127         debug_assert!(count <= self.submitting);
128         self.submitting -= count;
129     }
130 
131     // Indicates that `count` entries have been submitted to the kernel and so the space may be
132     // reused for new entries.
complete_submit(&mut self, count: usize)133     fn complete_submit(&mut self, count: usize) {
134         debug_assert!(count <= self.submitting);
135         self.submitting -= count;
136         self.added -= count;
137     }
138 
add_rw_op( &mut self, ptr: *const u8, len: usize, fd: RawFd, offset: u64, user_data: UserData, op: u8, ) -> Result<()>139     unsafe fn add_rw_op(
140         &mut self,
141         ptr: *const u8,
142         len: usize,
143         fd: RawFd,
144         offset: u64,
145         user_data: UserData,
146         op: u8,
147     ) -> Result<()> {
148         self.prep_next_sqe(|sqe, iovec| {
149             iovec.iov_base = ptr as *const libc::c_void as *mut _;
150             iovec.iov_len = len;
151             sqe.opcode = op;
152             sqe.addr = iovec as *const _ as *const libc::c_void as u64;
153             sqe.len = 1;
154             sqe.__bindgen_anon_1.off = offset;
155             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
156             sqe.ioprio = 0;
157             sqe.user_data = user_data;
158             sqe.flags = 0;
159             sqe.fd = fd;
160         })?;
161 
162         Ok(())
163     }
164 }
165 
166 /// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
167 /// to the kernel and asynchronously handling the completion of these operations.
168 /// Use the various `add_*` functions to configure operations, then call `wait` to start
169 /// the operations and get any completed results. Each op is given a u64 user_data argument that is
170 /// used to identify the result when returned in the iterator provided by `wait`.
171 ///
172 /// # Example polling an FD for readable status.
173 ///
174 /// ```
175 /// # use std::fs::File;
176 /// # use std::os::unix::io::AsRawFd;
177 /// # use std::path::Path;
178 /// # use sys_util::WatchingEvents;
179 /// # use io_uring::URingContext;
180 /// let f = File::open(Path::new("/dev/zero")).unwrap();
181 /// let uring = URingContext::new(16).unwrap();
182 /// uring
183 ///   .add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454)
184 /// .unwrap();
185 /// let (user_data, res) = uring.wait().unwrap().next().unwrap();
186 /// assert_eq!(user_data, 454 as io_uring::UserData);
187 /// assert_eq!(res.unwrap(), 1 as u32);
188 ///
189 /// ```
190 pub struct URingContext {
191     ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
192     submit_ring: Mutex<SubmitQueue>,
193     complete_ring: CompleteQueueState,
194     in_flight: AtomicUsize, // The number of pending operations.
195     stats: URingStats,
196 }
197 
198 impl URingContext {
199     /// Creates a `URingContext` where the underlying uring has a space for `num_entries`
200     /// simultaneous operations.
new(num_entries: usize) -> Result<URingContext>201     pub fn new(num_entries: usize) -> Result<URingContext> {
202         let ring_params = io_uring_params::default();
203         // The below unsafe block isolates the creation of the URingContext. Each step on it's own
204         // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
205         // base addresses maintains safety guarantees assuming the kernel API guarantees are
206         // trusted.
207         unsafe {
208             // Safe because the kernel is trusted to only modify params and `File` is created with
209             // an FD that it takes complete ownership of.
210             let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
211             let ring_file = File::from_raw_fd(fd);
212 
213             // Mmap the submit and completion queues.
214             // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
215             // is checked.
216             let submit_ring = SubmitQueueState::new(
217                 MemoryMapping::from_fd_offset_protection_populate(
218                     &ring_file,
219                     ring_params.sq_off.array as usize
220                         + ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
221                     u64::from(IORING_OFF_SQ_RING),
222                     Protection::read_write(),
223                     true,
224                 )
225                 .map_err(Error::MappingSubmitRing)?,
226                 &ring_params,
227             );
228 
229             let num_sqe = ring_params.sq_entries as usize;
230             let submit_queue_entries = SubmitQueueEntries {
231                 mmap: MemoryMapping::from_fd_offset_protection_populate(
232                     &ring_file,
233                     ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
234                     u64::from(IORING_OFF_SQES),
235                     Protection::read_write(),
236                     true,
237                 )
238                 .map_err(Error::MappingSubmitEntries)?,
239                 len: num_sqe,
240             };
241 
242             let complete_ring = CompleteQueueState::new(
243                 MemoryMapping::from_fd_offset_protection_populate(
244                     &ring_file,
245                     ring_params.cq_off.cqes as usize
246                         + ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
247                     u64::from(IORING_OFF_CQ_RING),
248                     Protection::read_write(),
249                     true,
250                 )
251                 .map_err(Error::MappingCompleteRing)?,
252                 &ring_params,
253             );
254 
255             Ok(URingContext {
256                 ring_file,
257                 submit_ring: Mutex::new(SubmitQueue {
258                     submit_ring,
259                     submit_queue_entries,
260                     io_vecs: Pin::from(vec![IoBufMut::new(&mut []); num_sqe].into_boxed_slice()),
261                     submitting: 0,
262                     added: 0,
263                     num_sqes: ring_params.sq_entries as usize,
264                 }),
265                 complete_ring,
266                 in_flight: AtomicUsize::new(0),
267                 stats: Default::default(),
268             })
269         }
270     }
271 
272     /// Asynchronously writes to `fd` from the address given in `ptr`.
273     /// # Safety
274     /// `add_write` will write up to `len` bytes of data from the address given by `ptr`. This is
275     /// only safe if the caller guarantees that the memory lives until the transaction is complete
276     /// and that completion has been returned from the `wait` function. In addition there must not
277     /// be other references to the data pointed to by `ptr` until the operation completes.  Ensure
278     /// that the fd remains open until the op completes as well.
add_write( &self, ptr: *const u8, len: usize, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>279     pub unsafe fn add_write(
280         &self,
281         ptr: *const u8,
282         len: usize,
283         fd: RawFd,
284         offset: u64,
285         user_data: UserData,
286     ) -> Result<()> {
287         self.submit_ring
288             .lock()
289             .add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_WRITEV as u8)
290     }
291 
292     /// Asynchronously reads from `fd` to the address given in `ptr`.
293     /// # Safety
294     /// `add_read` will write up to `len` bytes of data to the address given by `ptr`. This is only
295     /// safe if the caller guarantees there are no other references to that memory and that the
296     /// memory lives until the transaction is complete and that completion has been returned from
297     /// the `wait` function.  In addition there must not be any mutable references to the data
298     /// pointed to by `ptr` until the operation completes.  Ensure that the fd remains open until
299     /// the op completes as well.
add_read( &self, ptr: *mut u8, len: usize, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>300     pub unsafe fn add_read(
301         &self,
302         ptr: *mut u8,
303         len: usize,
304         fd: RawFd,
305         offset: u64,
306         user_data: UserData,
307     ) -> Result<()> {
308         self.submit_ring
309             .lock()
310             .add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8)
311     }
312 
313     /// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
314     /// existence.
add_writev_iter<I>( &self, iovecs: I, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,315     pub unsafe fn add_writev_iter<I>(
316         &self,
317         iovecs: I,
318         fd: RawFd,
319         offset: u64,
320         user_data: UserData,
321     ) -> Result<()>
322     where
323         I: Iterator<Item = libc::iovec>,
324     {
325         self.add_writev(
326             Pin::from(
327                 // Safe because the caller is required to guarantee that the memory pointed to by
328                 // `iovecs` lives until the transaction is complete and the completion has been
329                 // returned from `wait()`.
330                 iovecs
331                     .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
332                     .collect::<Vec<_>>()
333                     .into_boxed_slice(),
334             ),
335             fd,
336             offset,
337             user_data,
338         )
339     }
340 
341     /// Asynchronously writes to `fd` from the addresses given in `iovecs`.
342     /// # Safety
343     /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
344     /// guarantees there are no other references to that memory and that the memory lives until the
345     /// transaction is complete and that completion has been returned from the `wait` function.  In
346     /// addition there must not be any mutable references to the data pointed to by `iovecs` until
347     /// the operation completes.  Ensure that the fd remains open until the op completes as well.
348     /// The iovecs reference must be kept alive until the op returns.
add_writev( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>349     pub unsafe fn add_writev(
350         &self,
351         iovecs: Pin<Box<[IoBufMut<'static>]>>,
352         fd: RawFd,
353         offset: u64,
354         user_data: UserData,
355     ) -> Result<()> {
356         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
357             sqe.opcode = IORING_OP_WRITEV as u8;
358             sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
359             sqe.len = iovecs.len() as u32;
360             sqe.__bindgen_anon_1.off = offset;
361             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
362             sqe.ioprio = 0;
363             sqe.user_data = user_data;
364             sqe.flags = 0;
365             sqe.fd = fd;
366         })?;
367         self.complete_ring.add_op_data(user_data, iovecs);
368         Ok(())
369     }
370 
371     /// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
372     /// existence.
add_readv_iter<I>( &self, iovecs: I, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,373     pub unsafe fn add_readv_iter<I>(
374         &self,
375         iovecs: I,
376         fd: RawFd,
377         offset: u64,
378         user_data: UserData,
379     ) -> Result<()>
380     where
381         I: Iterator<Item = libc::iovec>,
382     {
383         self.add_readv(
384             Pin::from(
385                 // Safe because the caller is required to guarantee that the memory pointed to by
386                 // `iovecs` lives until the transaction is complete and the completion has been
387                 // returned from `wait()`.
388                 iovecs
389                     .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
390                     .collect::<Vec<_>>()
391                     .into_boxed_slice(),
392             ),
393             fd,
394             offset,
395             user_data,
396         )
397     }
398 
399     /// Asynchronously reads from `fd` to the addresses given in `iovecs`.
400     /// # Safety
401     /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
402     /// guarantees there are no other references to that memory and that the memory lives until the
403     /// transaction is complete and that completion has been returned from the `wait` function.  In
404     /// addition there must not be any references to the data pointed to by `iovecs` until the
405     /// operation completes.  Ensure that the fd remains open until the op completes as well.
406     /// The iovecs reference must be kept alive until the op returns.
add_readv( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>407     pub unsafe fn add_readv(
408         &self,
409         iovecs: Pin<Box<[IoBufMut<'static>]>>,
410         fd: RawFd,
411         offset: u64,
412         user_data: UserData,
413     ) -> Result<()> {
414         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
415             sqe.opcode = IORING_OP_READV as u8;
416             sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64;
417             sqe.len = iovecs.len() as u32;
418             sqe.__bindgen_anon_1.off = offset;
419             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
420             sqe.ioprio = 0;
421             sqe.user_data = user_data;
422             sqe.flags = 0;
423             sqe.fd = fd;
424         })?;
425         self.complete_ring.add_op_data(user_data, iovecs);
426         Ok(())
427     }
428 
429     /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
430     /// io_uring itself and for waking up a thread that's blocked inside a wait() call.
add_nop(&self, user_data: UserData) -> Result<()>431     pub fn add_nop(&self, user_data: UserData) -> Result<()> {
432         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
433             sqe.opcode = IORING_OP_NOP as u8;
434             sqe.fd = -1;
435             sqe.user_data = user_data;
436 
437             sqe.addr = 0;
438             sqe.len = 0;
439             sqe.__bindgen_anon_1.off = 0;
440             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
441             sqe.__bindgen_anon_2.rw_flags = 0;
442             sqe.ioprio = 0;
443             sqe.flags = 0;
444         })
445     }
446 
447     /// Syncs all completed operations, the ordering with in-flight async ops is not
448     /// defined.
add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()>449     pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
450         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
451             sqe.opcode = IORING_OP_FSYNC as u8;
452             sqe.fd = fd;
453             sqe.user_data = user_data;
454 
455             sqe.addr = 0;
456             sqe.len = 0;
457             sqe.__bindgen_anon_1.off = 0;
458             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
459             sqe.__bindgen_anon_2.rw_flags = 0;
460             sqe.ioprio = 0;
461             sqe.flags = 0;
462         })
463     }
464 
465     /// See the usage of `fallocate`, this asynchronously performs the same operations.
add_fallocate( &self, fd: RawFd, offset: u64, len: u64, mode: u32, user_data: UserData, ) -> Result<()>466     pub fn add_fallocate(
467         &self,
468         fd: RawFd,
469         offset: u64,
470         len: u64,
471         mode: u32,
472         user_data: UserData,
473     ) -> Result<()> {
474         // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
475         // len field.
476         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
477             sqe.opcode = IORING_OP_FALLOCATE as u8;
478 
479             sqe.fd = fd;
480             sqe.addr = len;
481             sqe.len = mode;
482             sqe.__bindgen_anon_1.off = offset;
483             sqe.user_data = user_data;
484 
485             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
486             sqe.__bindgen_anon_2.rw_flags = 0;
487             sqe.ioprio = 0;
488             sqe.flags = 0;
489         })
490     }
491 
492     /// Adds an FD to be polled based on the given flags.
493     /// The user must keep the FD open until the operation completion is returned from
494     /// `wait`.
495     /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
496     /// to get future events.
add_poll_fd( &self, fd: RawFd, events: &WatchingEvents, user_data: UserData, ) -> Result<()>497     pub fn add_poll_fd(
498         &self,
499         fd: RawFd,
500         events: &WatchingEvents,
501         user_data: UserData,
502     ) -> Result<()> {
503         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
504             sqe.opcode = IORING_OP_POLL_ADD as u8;
505             sqe.fd = fd;
506             sqe.user_data = user_data;
507             sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16;
508 
509             sqe.addr = 0;
510             sqe.len = 0;
511             sqe.__bindgen_anon_1.off = 0;
512             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
513             sqe.ioprio = 0;
514             sqe.flags = 0;
515         })
516     }
517 
518     /// Removes an FD that was previously added with `add_poll_fd`.
remove_poll_fd( &self, fd: RawFd, events: &WatchingEvents, user_data: UserData, ) -> Result<()>519     pub fn remove_poll_fd(
520         &self,
521         fd: RawFd,
522         events: &WatchingEvents,
523         user_data: UserData,
524     ) -> Result<()> {
525         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
526             sqe.opcode = IORING_OP_POLL_REMOVE as u8;
527             sqe.fd = fd;
528             sqe.user_data = user_data;
529             sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16;
530 
531             sqe.addr = 0;
532             sqe.len = 0;
533             sqe.__bindgen_anon_1.off = 0;
534             sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
535             sqe.ioprio = 0;
536             sqe.flags = 0;
537         })
538     }
539 
540     // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
541     // waiting for `wait_nr` operations to complete.
enter(&self, wait_nr: u64) -> Result<()>542     fn enter(&self, wait_nr: u64) -> Result<()> {
543         let completed = self.complete_ring.num_completed();
544         self.stats
545             .total_complete
546             .fetch_add(completed as u64, Ordering::Relaxed);
547         self.in_flight.fetch_sub(completed, Ordering::Relaxed);
548 
549         let added = self.submit_ring.lock().prepare_submit();
550         if added == 0 && wait_nr == 0 {
551             return Ok(());
552         }
553 
554         self.stats.total_enter_calls.fetch_add(1, Ordering::Relaxed);
555         let flags = if wait_nr > 0 {
556             IORING_ENTER_GETEVENTS
557         } else {
558             0
559         };
560         let res = unsafe {
561             // Safe because the only memory modified is in the completion queue.
562             io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags)
563         };
564 
565         match res {
566             Ok(_) => {
567                 self.submit_ring.lock().complete_submit(added);
568                 self.stats
569                     .total_ops
570                     .fetch_add(added as u64, Ordering::Relaxed);
571 
572                 // Release store synchronizes with acquire load above.
573                 self.in_flight.fetch_add(added, Ordering::Release);
574             }
575             Err(e) => {
576                 self.submit_ring.lock().fail_submit(added);
577 
578                 if wait_nr == 0 || e != libc::EBUSY {
579                     return Err(Error::RingEnter(e));
580                 }
581 
582                 // An ebusy return means that some completed events must be processed before
583                 // submitting more, wait for some to finish without pushing the new sqes in
584                 // that case.
585                 unsafe {
586                     io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags)
587                         .map_err(Error::RingEnter)?;
588                 }
589             }
590         }
591 
592         Ok(())
593     }
594 
595     /// Sends operations added with the `add_*` functions to the kernel.
submit(&self) -> Result<()>596     pub fn submit(&self) -> Result<()> {
597         self.enter(0)
598     }
599 
600     /// Sends operations added with the `add_*` functions to the kernel and return an iterator to any
601     /// completed operations. `wait` blocks until at least one completion is ready.  If called
602     /// without any new events added, this simply waits for any existing events to complete and
603     /// returns as soon an one or more is ready.
wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_>604     pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
605         // We only want to wait for events if there aren't already events in the completion queue.
606         let wait_nr = if self.complete_ring.num_ready() > 0 {
607             0
608         } else {
609             1
610         };
611 
612         // The CompletionQueue will iterate all completed ops.
613         match self.enter(wait_nr) {
614             Ok(()) => Ok(&self.complete_ring),
615             // If we cannot submit any more entries then we need to pull stuff out of the completion
616             // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
617             // we know there are already entries in the completion queue.
618             Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
619             Err(e) => Err(e),
620         }
621     }
622 }
623 
624 impl AsRawFd for URingContext {
as_raw_fd(&self) -> RawFd625     fn as_raw_fd(&self) -> RawFd {
626         self.ring_file.as_raw_fd()
627     }
628 }
629 
630 struct SubmitQueueEntries {
631     mmap: MemoryMapping,
632     len: usize,
633 }
634 
635 impl SubmitQueueEntries {
get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe>636     fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
637         if index >= self.len {
638             return None;
639         }
640         let mut_ref = unsafe {
641             // Safe because the mut borrow of self resticts to one mutable reference at a time and
642             // we trust that the kernel has returned enough memory in io_uring_setup and mmap.
643             &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index)
644         };
645         // Clear any state.
646         *mut_ref = io_uring_sqe::default();
647         Some(mut_ref)
648     }
649 }
650 
651 struct SubmitQueueState {
652     _mmap: MemoryMapping,
653     pointers: QueuePointers,
654     ring_mask: u32,
655     array: AtomicPtr<u32>,
656 }
657 
658 impl SubmitQueueState {
659     // # Safety
660     // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
661     // the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState662     unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
663         let ptr = mmap.as_ptr();
664         // Transmutes are safe because a u32 is atomic on all supported architectures and the
665         // pointer will live until after self is dropped because the mmap is owned.
666         let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
667         let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
668         // This offset is guaranteed to be within the mmap so unwrap the result.
669         let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
670         let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
671         SubmitQueueState {
672             _mmap: mmap,
673             pointers: QueuePointers { head, tail },
674             ring_mask,
675             array,
676         }
677     }
678 
679     // Sets the kernel's array entry at the given `index` to `value`.
set_array_entry(&self, index: usize, value: u32)680     fn set_array_entry(&self, index: usize, value: u32) {
681         // Safe because self being constructed from the correct mmap guaratees that the memory is
682         // valid to written.
683         unsafe {
684             std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value as u32);
685         }
686     }
687 }
688 
689 #[derive(Default)]
690 struct CompleteQueueData {
691     completed: usize,
692     //For ops that pass in arrays of iovecs, they need to be valid for the duration of the
693     //operation because the kernel might read them at any time.
694     pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
695 }
696 
697 struct CompleteQueueState {
698     mmap: MemoryMapping,
699     pointers: QueuePointers,
700     ring_mask: u32,
701     cqes_offset: u32,
702     data: Mutex<CompleteQueueData>,
703 }
704 
705 impl CompleteQueueState {
706     /// # Safety
707     /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
708     /// the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState709     unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
710         let ptr = mmap.as_ptr();
711         let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
712         let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
713         let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
714         CompleteQueueState {
715             mmap,
716             pointers: QueuePointers { head, tail },
717             ring_mask,
718             cqes_offset: params.cq_off.cqes,
719             data: Default::default(),
720         }
721     }
722 
add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>)723     fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
724         self.data.lock().pending_op_addrs.insert(user_data, addrs);
725     }
726 
get_cqe(&self, head: u32) -> &io_uring_cqe727     fn get_cqe(&self, head: u32) -> &io_uring_cqe {
728         unsafe {
729             // Safe because we trust that the kernel has returned enough memory in io_uring_setup
730             // and mmap and index is checked within range by the ring_mask.
731             let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
732                 as *const io_uring_cqe;
733 
734             let index = head & self.ring_mask;
735 
736             &*cqes.add(index as usize)
737         }
738     }
739 
num_ready(&self) -> u32740     fn num_ready(&self) -> u32 {
741         let tail = self.pointers.tail(Ordering::Acquire);
742         let head = self.pointers.head(Ordering::Relaxed);
743 
744         tail.saturating_sub(head)
745     }
746 
num_completed(&self) -> usize747     fn num_completed(&self) -> usize {
748         let mut data = self.data.lock();
749         ::std::mem::replace(&mut data.completed, 0)
750     }
751 
pop_front(&self) -> Option<(UserData, std::io::Result<u32>)>752     fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
753         // Take the lock on self.data first so that 2 threads don't try to pop the same completed op
754         // from the queue.
755         let mut data = self.data.lock();
756 
757         // Safe because the pointers to the atomics are valid and the cqe must be in range
758         // because the kernel provided mask is applied to the index.
759         let head = self.pointers.head(Ordering::Relaxed);
760 
761         // Synchronize the read of tail after the read of head.
762         if head == self.pointers.tail(Ordering::Acquire) {
763             return None;
764         }
765 
766         data.completed += 1;
767 
768         let cqe = self.get_cqe(head);
769         let user_data = cqe.user_data;
770         let res = cqe.res;
771 
772         // free the addrs saved for this op.
773         let _ = data.pending_op_addrs.remove(&user_data);
774 
775         // Store the new head and ensure the reads above complete before the kernel sees the
776         // update to head, `set_head` uses `Release` ordering
777         let new_head = head.wrapping_add(1);
778         self.pointers.set_head(new_head);
779 
780         let io_res = match res {
781             r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
782             r => Ok(r as u32),
783         };
784         Some((user_data, io_res))
785     }
786 }
787 
788 // Return the completed ops with their result.
789 impl<'c> Iterator for &'c CompleteQueueState {
790     type Item = (UserData, std::io::Result<u32>);
791 
next(&mut self) -> Option<Self::Item>792     fn next(&mut self) -> Option<Self::Item> {
793         self.pop_front()
794     }
795 }
796 
797 struct QueuePointers {
798     head: *const AtomicU32,
799     tail: *const AtomicU32,
800 }
801 
802 // Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
803 // safe to send the pointers between threads or access them concurrently from multiple threads.
804 unsafe impl Send for QueuePointers {}
805 unsafe impl Sync for QueuePointers {}
806 
807 impl QueuePointers {
808     // Loads the tail pointer atomically with the given ordering.
tail(&self, ordering: Ordering) -> u32809     fn tail(&self, ordering: Ordering) -> u32 {
810         // Safe because self being constructed from the correct mmap guaratees that the memory is
811         // valid to read.
812         unsafe { (*self.tail).load(ordering) }
813     }
814 
815     // Stores the new value of the tail in the submit queue. This allows the kernel to start
816     // processing entries that have been added up until the given tail pointer.
817     // Always stores with release ordering as that is the only valid way to use the pointer.
set_tail(&self, next_tail: u32)818     fn set_tail(&self, next_tail: u32) {
819         // Safe because self being constructed from the correct mmap guaratees that the memory is
820         // valid to read and it's used as an atomic to cover mutability concerns.
821         unsafe { (*self.tail).store(next_tail, Ordering::Release) }
822     }
823 
824     // Loads the head pointer atomically with the given ordering.
head(&self, ordering: Ordering) -> u32825     fn head(&self, ordering: Ordering) -> u32 {
826         // Safe because self being constructed from the correct mmap guaratees that the memory is
827         // valid to read.
828         unsafe { (*self.head).load(ordering) }
829     }
830 
831     // Stores the new value of the head in the submit queue. This allows the kernel to start
832     // processing entries that have been added up until the given head pointer.
833     // Always stores with release ordering as that is the only valid way to use the pointer.
set_head(&self, next_head: u32)834     fn set_head(&self, next_head: u32) {
835         // Safe because self being constructed from the correct mmap guaratees that the memory is
836         // valid to read and it's used as an atomic to cover mutability concerns.
837         unsafe { (*self.head).store(next_head, Ordering::Release) }
838     }
839 }
840 
841 #[cfg(test)]
842 mod tests {
843     use std::collections::BTreeSet;
844     use std::fs::OpenOptions;
845     use std::io::{IoSlice, IoSliceMut};
846     use std::io::{Read, Seek, SeekFrom, Write};
847     use std::mem;
848     use std::path::{Path, PathBuf};
849     use std::sync::mpsc::channel;
850     use std::sync::{Arc, Barrier};
851     use std::thread;
852     use std::time::Duration;
853 
854     use sync::{Condvar, Mutex};
855     use sys_util::{pipe, PollContext};
856     use tempfile::{tempfile, TempDir};
857 
858     use super::*;
859 
append_file_name(path: &Path, name: &str) -> PathBuf860     fn append_file_name(path: &Path, name: &str) -> PathBuf {
861         let mut joined = path.to_path_buf();
862         joined.push(name);
863         joined
864     }
865 
check_one_read( uring: &URingContext, buf: &mut [u8], fd: RawFd, offset: u64, user_data: UserData, )866     fn check_one_read(
867         uring: &URingContext,
868         buf: &mut [u8],
869         fd: RawFd,
870         offset: u64,
871         user_data: UserData,
872     ) {
873         let (user_data_ret, res) = unsafe {
874             // Safe because the `wait` call waits until the kernel is done with `buf`.
875             uring
876                 .add_read(buf.as_mut_ptr(), buf.len(), fd, offset, user_data)
877                 .unwrap();
878             uring.wait().unwrap().next().unwrap()
879         };
880         assert_eq!(user_data_ret, user_data);
881         assert_eq!(res.unwrap(), buf.len() as u32);
882     }
883 
check_one_readv( uring: &URingContext, buf: &mut [u8], fd: RawFd, offset: u64, user_data: UserData, )884     fn check_one_readv(
885         uring: &URingContext,
886         buf: &mut [u8],
887         fd: RawFd,
888         offset: u64,
889         user_data: UserData,
890     ) {
891         let io_vecs = unsafe {
892             //safe to transmut from IoSlice to iovec.
893             vec![IoSliceMut::new(buf)]
894                 .into_iter()
895                 .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
896         };
897         let (user_data_ret, res) = unsafe {
898             // Safe because the `wait` call waits until the kernel is done with `buf`.
899             uring
900                 .add_readv_iter(io_vecs, fd, offset, user_data)
901                 .unwrap();
902             uring.wait().unwrap().next().unwrap()
903         };
904         assert_eq!(user_data_ret, user_data);
905         assert_eq!(res.unwrap(), buf.len() as u32);
906     }
907 
create_test_file(size: u64) -> std::fs::File908     fn create_test_file(size: u64) -> std::fs::File {
909         let f = tempfile().unwrap();
910         f.set_len(size).unwrap();
911         f
912     }
913 
914     #[test]
915     // Queue as many reads as possible and then collect the completions.
read_parallel()916     fn read_parallel() {
917         const QUEUE_SIZE: usize = 10;
918         const BUF_SIZE: usize = 0x1000;
919 
920         let uring = URingContext::new(QUEUE_SIZE).unwrap();
921         let mut buf = [0u8; BUF_SIZE * QUEUE_SIZE];
922         let f = create_test_file((BUF_SIZE * QUEUE_SIZE) as u64);
923 
924         // check that the whole file can be read and that the queues wrapping is handled by reading
925         // double the quue depth of buffers.
926         for i in 0..QUEUE_SIZE * 64 {
927             let index = i as u64;
928             unsafe {
929                 let offset = (i % QUEUE_SIZE) * BUF_SIZE;
930                 match uring.add_read(
931                     buf[offset..].as_mut_ptr(),
932                     BUF_SIZE,
933                     f.as_raw_fd(),
934                     offset as u64,
935                     index,
936                 ) {
937                     Ok(_) => (),
938                     Err(Error::NoSpace) => {
939                         let _ = uring.wait().unwrap().next().unwrap();
940                     }
941                     Err(_) => panic!("unexpected error from uring wait"),
942                 }
943             }
944         }
945     }
946 
947     #[test]
read_readv()948     fn read_readv() {
949         let queue_size = 128;
950 
951         let uring = URingContext::new(queue_size).unwrap();
952         let mut buf = [0u8; 0x1000];
953         let f = create_test_file(0x1000 * 2);
954 
955         // check that the whole file can be read and that the queues wrapping is handled by reading
956         // double the quue depth of buffers.
957         for i in 0..queue_size * 2 {
958             let index = i as u64;
959             check_one_read(&uring, &mut buf, f.as_raw_fd(), (index % 2) * 0x1000, index);
960             check_one_readv(&uring, &mut buf, f.as_raw_fd(), (index % 2) * 0x1000, index);
961         }
962     }
963 
964     #[test]
readv_vec()965     fn readv_vec() {
966         let queue_size = 128;
967         const BUF_SIZE: usize = 0x2000;
968 
969         let uring = URingContext::new(queue_size).unwrap();
970         let mut buf = [0u8; BUF_SIZE];
971         let mut buf2 = [0u8; BUF_SIZE];
972         let mut buf3 = [0u8; BUF_SIZE];
973         let io_vecs = unsafe {
974             //safe to transmut from IoSlice to iovec.
975             vec![
976                 IoSliceMut::new(&mut buf),
977                 IoSliceMut::new(&mut buf2),
978                 IoSliceMut::new(&mut buf3),
979             ]
980             .into_iter()
981             .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
982             .collect::<Vec<libc::iovec>>()
983         };
984         let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
985         let f = create_test_file(total_len as u64 * 2);
986         let (user_data_ret, res) = unsafe {
987             // Safe because the `wait` call waits until the kernel is done with `buf`.
988             uring
989                 .add_readv_iter(io_vecs.into_iter(), f.as_raw_fd(), 0, 55)
990                 .unwrap();
991             uring.wait().unwrap().next().unwrap()
992         };
993         assert_eq!(user_data_ret, 55);
994         assert_eq!(res.unwrap(), total_len as u32);
995     }
996 
997     #[test]
write_one_block()998     fn write_one_block() {
999         let uring = URingContext::new(16).unwrap();
1000         let mut buf = [0u8; 4096];
1001         let mut f = create_test_file(0);
1002         f.write(&buf).unwrap();
1003         f.write(&buf).unwrap();
1004 
1005         unsafe {
1006             // Safe because the `wait` call waits until the kernel is done mutating `buf`.
1007             uring
1008                 .add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55)
1009                 .unwrap();
1010             let (user_data, res) = uring.wait().unwrap().next().unwrap();
1011             assert_eq!(user_data, 55_u64);
1012             assert_eq!(res.unwrap(), buf.len() as u32);
1013         }
1014     }
1015 
1016     #[test]
write_one_submit_poll()1017     fn write_one_submit_poll() {
1018         let uring = URingContext::new(16).unwrap();
1019         let mut buf = [0u8; 4096];
1020         let mut f = create_test_file(0);
1021         f.write(&buf).unwrap();
1022         f.write(&buf).unwrap();
1023 
1024         let ctx: PollContext<u64> = PollContext::build_with(&[(&uring, 1)]).unwrap();
1025         {
1026             // Test that the uring context isn't readable before any events are complete.
1027             let events = ctx.wait_timeout(Duration::from_millis(1)).unwrap();
1028             assert!(events.iter_readable().next().is_none());
1029         }
1030 
1031         unsafe {
1032             // Safe because the `wait` call waits until the kernel is done mutating `buf`.
1033             uring
1034                 .add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55)
1035                 .unwrap();
1036             uring.submit().unwrap();
1037             // Poll for completion with epoll.
1038             let events = ctx.wait().unwrap();
1039             let event = events.iter_readable().next().unwrap();
1040             assert_eq!(event.token(), 1);
1041             let (user_data, res) = uring.wait().unwrap().next().unwrap();
1042             assert_eq!(user_data, 55_u64);
1043             assert_eq!(res.unwrap(), buf.len() as u32);
1044         }
1045     }
1046 
1047     #[test]
writev_vec()1048     fn writev_vec() {
1049         let queue_size = 128;
1050         const BUF_SIZE: usize = 0x2000;
1051         const OFFSET: u64 = 0x2000;
1052 
1053         let uring = URingContext::new(queue_size).unwrap();
1054         let buf = [0xaau8; BUF_SIZE];
1055         let buf2 = [0xffu8; BUF_SIZE];
1056         let buf3 = [0x55u8; BUF_SIZE];
1057         let io_vecs = unsafe {
1058             //safe to transmut from IoSlice to iovec.
1059             vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)]
1060                 .into_iter()
1061                 .map(|slice| std::mem::transmute::<IoSlice, libc::iovec>(slice))
1062                 .collect::<Vec<libc::iovec>>()
1063         };
1064         let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
1065         let mut f = create_test_file(total_len as u64 * 2);
1066         let (user_data_ret, res) = unsafe {
1067             // Safe because the `wait` call waits until the kernel is done with `buf`.
1068             uring
1069                 .add_writev_iter(io_vecs.into_iter(), f.as_raw_fd(), OFFSET, 55)
1070                 .unwrap();
1071             uring.wait().unwrap().next().unwrap()
1072         };
1073         assert_eq!(user_data_ret, 55);
1074         assert_eq!(res.unwrap(), total_len as u32);
1075 
1076         let mut read_back = [0u8; BUF_SIZE];
1077         f.seek(SeekFrom::Start(OFFSET)).unwrap();
1078         f.read(&mut read_back).unwrap();
1079         assert!(!read_back.iter().any(|&b| b != 0xaa));
1080         f.read(&mut read_back).unwrap();
1081         assert!(!read_back.iter().any(|&b| b != 0xff));
1082         f.read(&mut read_back).unwrap();
1083         assert!(!read_back.iter().any(|&b| b != 0x55));
1084     }
1085 
1086     #[test]
fallocate_fsync()1087     fn fallocate_fsync() {
1088         let tempdir = TempDir::new().unwrap();
1089         let file_path = append_file_name(tempdir.path(), "test");
1090 
1091         {
1092             let buf = [0u8; 4096];
1093             let mut f = OpenOptions::new()
1094                 .read(true)
1095                 .write(true)
1096                 .create(true)
1097                 .truncate(true)
1098                 .open(&file_path)
1099                 .unwrap();
1100             f.write(&buf).unwrap();
1101         }
1102 
1103         let init_size = std::fs::metadata(&file_path).unwrap().len() as usize;
1104         let set_size = init_size + 1024 * 1024 * 50;
1105         let f = OpenOptions::new()
1106             .read(true)
1107             .write(true)
1108             .create(true)
1109             .open(&file_path)
1110             .unwrap();
1111 
1112         let uring = URingContext::new(16).unwrap();
1113         uring
1114             .add_fallocate(f.as_raw_fd(), 0, set_size as u64, 0, 66)
1115             .unwrap();
1116         let (user_data, res) = uring.wait().unwrap().next().unwrap();
1117         assert_eq!(user_data, 66_u64);
1118         match res {
1119             Err(e) => {
1120                 if e.kind() == std::io::ErrorKind::InvalidInput {
1121                     // skip on kernels that don't support fallocate.
1122                     return;
1123                 }
1124                 panic!("Unexpected fallocate error: {}", e);
1125             }
1126             Ok(val) => assert_eq!(val, 0_u32),
1127         }
1128 
1129         // Add a few writes and then fsync
1130         let buf = [0u8; 4096];
1131         let mut pending = std::collections::BTreeSet::new();
1132         unsafe {
1133             uring
1134                 .add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 0, 67)
1135                 .unwrap();
1136             pending.insert(67u64);
1137             uring
1138                 .add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 4096, 68)
1139                 .unwrap();
1140             pending.insert(68);
1141             uring
1142                 .add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 8192, 69)
1143                 .unwrap();
1144             pending.insert(69);
1145         }
1146         uring.add_fsync(f.as_raw_fd(), 70).unwrap();
1147         pending.insert(70);
1148 
1149         let mut wait_calls = 0;
1150 
1151         while !pending.is_empty() && wait_calls < 5 {
1152             let events = uring.wait().unwrap();
1153             for (user_data, res) in events {
1154                 assert!(res.is_ok());
1155                 assert!(pending.contains(&user_data));
1156                 pending.remove(&user_data);
1157             }
1158             wait_calls += 1;
1159         }
1160         assert!(pending.is_empty());
1161 
1162         uring
1163             .add_fallocate(
1164                 f.as_raw_fd(),
1165                 init_size as u64,
1166                 (set_size - init_size) as u64,
1167                 (libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE) as u32,
1168                 68,
1169             )
1170             .unwrap();
1171         let (user_data, res) = uring.wait().unwrap().next().unwrap();
1172         assert_eq!(user_data, 68_u64);
1173         assert_eq!(res.unwrap(), 0_u32);
1174 
1175         drop(f); // Close to ensure directory entires for metadata are updated.
1176 
1177         let new_size = std::fs::metadata(&file_path).unwrap().len() as usize;
1178         assert_eq!(new_size, set_size);
1179     }
1180 
1181     #[test]
dev_zero_readable()1182     fn dev_zero_readable() {
1183         let f = File::open(Path::new("/dev/zero")).unwrap();
1184         let uring = URingContext::new(16).unwrap();
1185         uring
1186             .add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454)
1187             .unwrap();
1188         let (user_data, res) = uring.wait().unwrap().next().unwrap();
1189         assert_eq!(user_data, 454_u64);
1190         assert_eq!(res.unwrap(), 1_u32);
1191     }
1192 
1193     #[test]
queue_many_ebusy_retry()1194     fn queue_many_ebusy_retry() {
1195         let num_entries = 16;
1196         let f = File::open(Path::new("/dev/zero")).unwrap();
1197         let uring = URingContext::new(num_entries).unwrap();
1198         // Fill the sumbit ring.
1199         for sqe_batch in 0..3 {
1200             for i in 0..num_entries {
1201                 uring
1202                     .add_poll_fd(
1203                         f.as_raw_fd(),
1204                         &WatchingEvents::empty().set_read(),
1205                         (sqe_batch * num_entries + i) as u64,
1206                     )
1207                     .unwrap();
1208             }
1209             uring.submit().unwrap();
1210         }
1211         // Adding more than the number of cqes will cause the uring to return ebusy, make sure that
1212         // is handled cleanly and wait still returns the completed entries.
1213         uring
1214             .add_poll_fd(
1215                 f.as_raw_fd(),
1216                 &WatchingEvents::empty().set_read(),
1217                 (num_entries * 3) as u64,
1218             )
1219             .unwrap();
1220         // The first wait call should return the cques that are already filled.
1221         {
1222             let mut results = uring.wait().unwrap();
1223             for _i in 0..num_entries * 2 {
1224                 assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
1225             }
1226             assert!(results.next().is_none());
1227         }
1228         // The second will finish submitting any more sqes and return the rest.
1229         let mut results = uring.wait().unwrap();
1230         for _i in 0..num_entries + 1 {
1231             assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
1232         }
1233         assert!(results.next().is_none());
1234     }
1235 
1236     #[test]
wake_with_nop()1237     fn wake_with_nop() {
1238         const PIPE_READ: UserData = 0;
1239         const NOP: UserData = 1;
1240         const BUF_DATA: [u8; 16] = [0xf4; 16];
1241 
1242         let uring = URingContext::new(4).map(Arc::new).unwrap();
1243         let (pipe_out, mut pipe_in) = pipe(true).unwrap();
1244         let (tx, rx) = channel();
1245 
1246         let uring2 = uring.clone();
1247         let wait_thread = thread::spawn(move || {
1248             let mut buf = [0u8; BUF_DATA.len()];
1249             unsafe {
1250                 uring2
1251                     .add_read(buf.as_mut_ptr(), buf.len(), pipe_out.as_raw_fd(), 0, 0)
1252                     .unwrap();
1253             }
1254 
1255             // This is still a bit racy as the other thread may end up adding the NOP before we make
1256             // the syscall but I'm not aware of a mechanism that will notify the other thread
1257             // exactly when we make the syscall.
1258             tx.send(()).unwrap();
1259             let mut events = uring2.wait().unwrap();
1260             let (user_data, result) = events.next().unwrap();
1261             assert_eq!(user_data, NOP);
1262             assert_eq!(result.unwrap(), 0);
1263 
1264             tx.send(()).unwrap();
1265             let mut events = uring2.wait().unwrap();
1266             let (user_data, result) = events.next().unwrap();
1267             assert_eq!(user_data, PIPE_READ);
1268             assert_eq!(result.unwrap(), buf.len() as u32);
1269             assert_eq!(&buf, &BUF_DATA);
1270         });
1271 
1272         // Wait until the other thread is about to make the syscall.
1273         rx.recv_timeout(Duration::from_secs(10)).unwrap();
1274 
1275         // Now add a NOP operation. This should wake up the other thread even though it cannot yet
1276         // read from the pipe.
1277         uring.add_nop(NOP).unwrap();
1278         uring.submit().unwrap();
1279 
1280         // Wait for the other thread to process the NOP result.
1281         rx.recv_timeout(Duration::from_secs(10)).unwrap();
1282 
1283         // Now write to the pipe to finish the uring read.
1284         pipe_in.write_all(&BUF_DATA).unwrap();
1285 
1286         wait_thread.join().unwrap();
1287     }
1288 
1289     #[test]
complete_from_any_thread()1290     fn complete_from_any_thread() {
1291         let num_entries = 16;
1292         let uring = URingContext::new(num_entries).map(Arc::new).unwrap();
1293 
1294         // Fill the sumbit ring.
1295         for sqe_batch in 0..3 {
1296             for i in 0..num_entries {
1297                 uring.add_nop((sqe_batch * num_entries + i) as u64).unwrap();
1298             }
1299             uring.submit().unwrap();
1300         }
1301 
1302         // Spawn a bunch of threads that pull cqes out of the uring and make sure none of them see a
1303         // duplicate.
1304         const NUM_THREADS: usize = 7;
1305         let completed = Arc::new(Mutex::new(BTreeSet::new()));
1306         let cv = Arc::new(Condvar::new());
1307         let barrier = Arc::new(Barrier::new(NUM_THREADS));
1308 
1309         let mut threads = Vec::with_capacity(NUM_THREADS);
1310         for _ in 0..NUM_THREADS {
1311             let uring = uring.clone();
1312             let completed = completed.clone();
1313             let barrier = barrier.clone();
1314             let cv = cv.clone();
1315             threads.push(thread::spawn(move || {
1316                 barrier.wait();
1317 
1318                 'wait: while completed.lock().len() < num_entries * 3 {
1319                     for (user_data, result) in uring.wait().unwrap() {
1320                         assert_eq!(result.unwrap(), 0);
1321 
1322                         let mut completed = completed.lock();
1323                         assert!(completed.insert(user_data));
1324                         if completed.len() >= num_entries * 3 {
1325                             break 'wait;
1326                         }
1327                     }
1328                 }
1329 
1330                 cv.notify_one();
1331             }));
1332         }
1333 
1334         // Wait until all the operations have completed.
1335         let mut c = completed.lock();
1336         while c.len() < num_entries * 3 {
1337             c = cv.wait(c);
1338         }
1339         mem::drop(c);
1340 
1341         // Let the OS clean up the still-waiting threads after the test run.
1342     }
1343 
1344     #[test]
submit_from_any_thread()1345     fn submit_from_any_thread() {
1346         const NUM_THREADS: usize = 7;
1347         const ITERATIONS: usize = 113;
1348         const NUM_ENTRIES: usize = 16;
1349 
1350         fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
1351             let mut in_flight = in_flight.lock();
1352             while *in_flight > NUM_ENTRIES as isize {
1353                 in_flight = cv.wait(in_flight);
1354             }
1355         }
1356 
1357         let uring = URingContext::new(NUM_ENTRIES).map(Arc::new).unwrap();
1358         let in_flight = Arc::new(Mutex::new(0));
1359         let cv = Arc::new(Condvar::new());
1360 
1361         let mut threads = Vec::with_capacity(NUM_THREADS);
1362         for idx in 0..NUM_THREADS {
1363             let uring = uring.clone();
1364             let in_flight = in_flight.clone();
1365             let cv = cv.clone();
1366             threads.push(thread::spawn(move || {
1367                 for iter in 0..ITERATIONS {
1368                     loop {
1369                         match uring.add_nop(((idx * NUM_THREADS) + iter) as UserData) {
1370                             Ok(()) => *in_flight.lock() += 1,
1371                             Err(Error::NoSpace) => {
1372                                 wait_for_completion_thread(&in_flight, &cv);
1373                                 continue;
1374                             }
1375                             Err(e) => panic!("Failed to add nop: {}", e),
1376                         }
1377 
1378                         // We don't need to wait for the completion queue if the submit fails with
1379                         // EBUSY because we already added the operation to the submit queue. It will
1380                         // get added eventually.
1381                         match uring.submit() {
1382                             Ok(()) => break,
1383                             Err(Error::RingEnter(libc::EBUSY)) => break,
1384                             Err(e) => panic!("Failed to submit ops: {}", e),
1385                         }
1386                     }
1387                 }
1388             }));
1389         }
1390 
1391         let mut completed = 0;
1392         while completed < NUM_THREADS * ITERATIONS {
1393             for (_, res) in uring.wait().unwrap() {
1394                 assert_eq!(res.unwrap(), 0);
1395                 completed += 1;
1396 
1397                 let mut in_flight = in_flight.lock();
1398                 *in_flight -= 1;
1399                 let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
1400                 mem::drop(in_flight);
1401 
1402                 if notify_submitters {
1403                     cv.notify_all();
1404                 }
1405 
1406                 if completed >= NUM_THREADS * ITERATIONS {
1407                     break;
1408                 }
1409             }
1410         }
1411 
1412         for t in threads {
1413             t.join().unwrap();
1414         }
1415 
1416         // Make sure we didn't submit more entries than expected.
1417         assert_eq!(*in_flight.lock(), 0);
1418         assert_eq!(uring.submit_ring.lock().added, 0);
1419         assert_eq!(uring.complete_ring.num_ready(), 0);
1420         assert_eq!(
1421             uring.stats.total_ops.load(Ordering::Relaxed),
1422             (NUM_THREADS * ITERATIONS) as u64
1423         );
1424     }
1425 
1426     // TODO(b/183722981): Fix and re-enable test
1427     #[test]
1428     #[ignore]
multi_thread_submit_and_complete()1429     fn multi_thread_submit_and_complete() {
1430         const NUM_SUBMITTERS: usize = 7;
1431         const NUM_COMPLETERS: usize = 3;
1432         const ITERATIONS: usize = 113;
1433         const NUM_ENTRIES: usize = 16;
1434 
1435         fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
1436             let mut in_flight = in_flight.lock();
1437             while *in_flight > NUM_ENTRIES as isize {
1438                 in_flight = cv.wait(in_flight);
1439             }
1440         }
1441 
1442         let uring = URingContext::new(NUM_ENTRIES).map(Arc::new).unwrap();
1443         let in_flight = Arc::new(Mutex::new(0));
1444         let cv = Arc::new(Condvar::new());
1445 
1446         let mut threads = Vec::with_capacity(NUM_SUBMITTERS + NUM_COMPLETERS);
1447         for idx in 0..NUM_SUBMITTERS {
1448             let uring = uring.clone();
1449             let in_flight = in_flight.clone();
1450             let cv = cv.clone();
1451             threads.push(thread::spawn(move || {
1452                 for iter in 0..ITERATIONS {
1453                     loop {
1454                         match uring.add_nop(((idx * NUM_SUBMITTERS) + iter) as UserData) {
1455                             Ok(()) => *in_flight.lock() += 1,
1456                             Err(Error::NoSpace) => {
1457                                 wait_for_completion_thread(&in_flight, &cv);
1458                                 continue;
1459                             }
1460                             Err(e) => panic!("Failed to add nop: {}", e),
1461                         }
1462 
1463                         // We don't need to wait for the completion queue if the submit fails with
1464                         // EBUSY because we already added the operation to the submit queue. It will
1465                         // get added eventually.
1466                         match uring.submit() {
1467                             Ok(()) => break,
1468                             Err(Error::RingEnter(libc::EBUSY)) => break,
1469                             Err(e) => panic!("Failed to submit ops: {}", e),
1470                         }
1471                     }
1472                 }
1473             }));
1474         }
1475 
1476         let completed = Arc::new(AtomicUsize::new(0));
1477         for _ in 0..NUM_COMPLETERS {
1478             let uring = uring.clone();
1479             let in_flight = in_flight.clone();
1480             let cv = cv.clone();
1481             let completed = completed.clone();
1482             threads.push(thread::spawn(move || {
1483                 while completed.load(Ordering::Relaxed) < NUM_SUBMITTERS * ITERATIONS {
1484                     for (_, res) in uring.wait().unwrap() {
1485                         assert_eq!(res.unwrap(), 0);
1486                         completed.fetch_add(1, Ordering::Relaxed);
1487 
1488                         let mut in_flight = in_flight.lock();
1489                         *in_flight -= 1;
1490                         let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
1491                         mem::drop(in_flight);
1492 
1493                         if notify_submitters {
1494                             cv.notify_all();
1495                         }
1496 
1497                         if completed.load(Ordering::Relaxed) >= NUM_SUBMITTERS * ITERATIONS {
1498                             break;
1499                         }
1500                     }
1501                 }
1502             }));
1503         }
1504 
1505         for t in threads.drain(..NUM_SUBMITTERS) {
1506             t.join().unwrap();
1507         }
1508 
1509         // Now that all submitters are finished, add NOPs to wake up any completers blocked on the
1510         // syscall.
1511         for i in 0..NUM_COMPLETERS {
1512             uring
1513                 .add_nop((NUM_SUBMITTERS * ITERATIONS + i) as UserData)
1514                 .unwrap();
1515         }
1516         uring.submit().unwrap();
1517 
1518         for t in threads {
1519             t.join().unwrap();
1520         }
1521 
1522         // Make sure we didn't submit more entries than expected. Only the last few NOPs added to
1523         // wake up the completer threads may still be in the completion ring.
1524         assert!(uring.complete_ring.num_ready() <= NUM_COMPLETERS as u32);
1525         assert_eq!(
1526             in_flight.lock().abs() as u32 + uring.complete_ring.num_ready(),
1527             NUM_COMPLETERS as u32
1528         );
1529         assert_eq!(uring.submit_ring.lock().added, 0);
1530         assert_eq!(
1531             uring.stats.total_ops.load(Ordering::Relaxed),
1532             (NUM_SUBMITTERS * ITERATIONS + NUM_COMPLETERS) as u64
1533         );
1534     }
1535 }
1536