1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! # `IoSourceExt`
6 //!
7 //! User functions to asynchronously access files.
8 //! Using `IoSource` directly is inconvenient and requires dealing with state
9 //! machines for the backing uring, future libraries, etc. `IoSourceExt` instead
10 //! provides users with a future that can be `await`ed from async context.
11 //!
12 //! Each member of `IoSourceExt` returns a future for the supported operation. One or more
13 //! operation can be pending at a time.
14 //!
15 //! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the
16 //! `URingExecutor` documentation for an explaination of why.
17 
18 use std::fs::File;
19 use std::os::unix::io::AsRawFd;
20 use std::sync::Arc;
21 
22 use async_trait::async_trait;
23 use sys_util::net::UnixSeqpacket;
24 use thiserror::Error as ThisError;
25 
26 use crate::{BackingMemory, MemRegion};
27 
28 #[derive(ThisError, Debug)]
29 pub enum Error {
30     /// An error with a polled(FD) source.
31     #[error("An error with a poll source: {0}")]
32     Poll(crate::poll_source::Error),
33     /// An error with a uring source.
34     #[error("An error with a uring source: {0}")]
35     Uring(crate::uring_executor::Error),
36 }
37 pub type Result<T> = std::result::Result<T, Error>;
38 
39 impl From<crate::uring_executor::Error> for Error {
from(err: crate::uring_executor::Error) -> Self40     fn from(err: crate::uring_executor::Error) -> Self {
41         Error::Uring(err)
42     }
43 }
44 
45 impl From<crate::poll_source::Error> for Error {
from(err: crate::poll_source::Error) -> Self46     fn from(err: crate::poll_source::Error) -> Self {
47         Error::Poll(err)
48     }
49 }
50 
51 /// Ergonomic methods for async reads.
52 #[async_trait(?Send)]
53 pub trait ReadAsync {
54     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec<u8>) -> Result<(usize, Vec<u8>)>55     async fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec<u8>) -> Result<(usize, Vec<u8>)>;
56 
57     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem<'a>( &'a self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>58     async fn read_to_mem<'a>(
59         &'a self,
60         file_offset: u64,
61         mem: Arc<dyn BackingMemory + Send + Sync>,
62         mem_offsets: &'a [MemRegion],
63     ) -> Result<usize>;
64 
65     /// Wait for the FD of `self` to be readable.
wait_readable(&self) -> Result<()>66     async fn wait_readable(&self) -> Result<()>;
67 
68     /// Reads a single u64 from the current offset.
read_u64(&self) -> Result<u64>69     async fn read_u64(&self) -> Result<u64>;
70 }
71 
72 /// Ergonomic methods for async writes.
73 #[async_trait(?Send)]
74 pub trait WriteAsync {
75     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec<'a>( &'a self, file_offset: u64, vec: Vec<u8>, ) -> Result<(usize, Vec<u8>)>76     async fn write_from_vec<'a>(
77         &'a self,
78         file_offset: u64,
79         vec: Vec<u8>,
80     ) -> Result<(usize, Vec<u8>)>;
81 
82     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem<'a>( &'a self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: &'a [MemRegion], ) -> Result<usize>83     async fn write_from_mem<'a>(
84         &'a self,
85         file_offset: u64,
86         mem: Arc<dyn BackingMemory + Send + Sync>,
87         mem_offsets: &'a [MemRegion],
88     ) -> Result<usize>;
89 
90     /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>91     async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>;
92 
93     /// Sync all completed write operations to the backing storage.
fsync(&self) -> Result<()>94     async fn fsync(&self) -> Result<()>;
95 }
96 
97 /// Subtrait for general async IO.
98 #[async_trait(?Send)]
99 pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
100     /// Yields the underlying IO source.
into_source(self: Box<Self>) -> F101     fn into_source(self: Box<Self>) -> F;
102 
103     /// Provides a mutable ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F104     fn as_source_mut(&mut self) -> &mut F;
105 
106     /// Provides a ref to the underlying IO source.
as_source(&self) -> &F107     fn as_source(&self) -> &F;
108 }
109 
110 /// Marker trait signifying that the implementor is suitable for use with
111 /// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket.
112 ///
113 /// (Note: it'd be really nice to implement a TryFrom for any implementors, and
114 /// remove our factory functions. Unfortunately
115 /// https://github.com/rust-lang/rust/issues/50133 makes that too painful.)
116 pub trait IntoAsync: AsRawFd {}
117 
118 impl IntoAsync for File {}
119 impl IntoAsync for UnixSeqpacket {}
120 impl IntoAsync for &UnixSeqpacket {}
121 
122 #[cfg(test)]
123 mod tests {
124     use std::fs::{File, OpenOptions};
125     use std::future::Future;
126     use std::os::unix::io::AsRawFd;
127     use std::pin::Pin;
128     use std::sync::Arc;
129     use std::task::{Context, Poll, Waker};
130     use std::thread;
131 
132     use sync::Mutex;
133 
134     use super::*;
135     use crate::executor::{async_poll_from, async_uring_from};
136     use crate::mem::VecIoWrapper;
137     use crate::{Executor, FdExecutor, MemRegion, PollSource, URingExecutor, UringSource};
138 
139     struct State {
140         should_quit: bool,
141         waker: Option<Waker>,
142     }
143 
144     impl State {
wake(&mut self)145         fn wake(&mut self) {
146             self.should_quit = true;
147             let waker = self.waker.take();
148 
149             if let Some(waker) = waker {
150                 waker.wake();
151             }
152         }
153     }
154 
155     struct Quit {
156         state: Arc<Mutex<State>>,
157     }
158 
159     impl Future for Quit {
160         type Output = ();
161 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>162         fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
163             let mut state = self.state.lock();
164             if state.should_quit {
165                 return Poll::Ready(());
166             }
167 
168             state.waker = Some(cx.waker().clone());
169             Poll::Pending
170         }
171     }
172 
173     #[test]
await_uring_from_poll()174     fn await_uring_from_poll() {
175         // Start a uring operation and then await the result from an FdExecutor.
176         async fn go(source: UringSource<File>) {
177             let v = vec![0xa4u8; 16];
178             let (len, vec) = source.read_to_vec(0, v).await.unwrap();
179             assert_eq!(len, 16);
180             assert!(vec.iter().all(|&b| b == 0));
181         }
182 
183         let state = Arc::new(Mutex::new(State {
184             should_quit: false,
185             waker: None,
186         }));
187 
188         let uring_ex = URingExecutor::new().unwrap();
189         let f = File::open("/dev/zero").unwrap();
190         let source = UringSource::new(f, &uring_ex).unwrap();
191 
192         let quit = Quit {
193             state: state.clone(),
194         };
195         let handle = thread::spawn(move || uring_ex.run_until(quit));
196 
197         let poll_ex = FdExecutor::new().unwrap();
198         poll_ex.run_until(go(source)).unwrap();
199 
200         state.lock().wake();
201         handle.join().unwrap().unwrap();
202     }
203 
204     #[test]
await_poll_from_uring()205     fn await_poll_from_uring() {
206         // Start a poll operation and then await the result from a URingExecutor.
207         async fn go(source: PollSource<File>) {
208             let v = vec![0x2cu8; 16];
209             let (len, vec) = source.read_to_vec(0, v).await.unwrap();
210             assert_eq!(len, 16);
211             assert!(vec.iter().all(|&b| b == 0));
212         }
213 
214         let state = Arc::new(Mutex::new(State {
215             should_quit: false,
216             waker: None,
217         }));
218 
219         let poll_ex = FdExecutor::new().unwrap();
220         let f = File::open("/dev/zero").unwrap();
221         let source = PollSource::new(f, &poll_ex).unwrap();
222 
223         let quit = Quit {
224             state: state.clone(),
225         };
226         let handle = thread::spawn(move || poll_ex.run_until(quit));
227 
228         let uring_ex = URingExecutor::new().unwrap();
229         uring_ex.run_until(go(source)).unwrap();
230 
231         state.lock().wake();
232         handle.join().unwrap().unwrap();
233     }
234 
235     #[test]
readvec()236     fn readvec() {
237         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
238             let v = vec![0x55u8; 32];
239             let v_ptr = v.as_ptr();
240             let ret = async_source.read_to_vec(0, v).await.unwrap();
241             assert_eq!(ret.0, 32);
242             let ret_v = ret.1;
243             assert_eq!(v_ptr, ret_v.as_ptr());
244             assert!(ret_v.iter().all(|&b| b == 0));
245         }
246 
247         let f = File::open("/dev/zero").unwrap();
248         let uring_ex = URingExecutor::new().unwrap();
249         let uring_source = async_uring_from(f, &uring_ex).unwrap();
250         uring_ex.run_until(go(uring_source)).unwrap();
251 
252         let f = File::open("/dev/zero").unwrap();
253         let poll_ex = FdExecutor::new().unwrap();
254         let poll_source = async_poll_from(f, &poll_ex).unwrap();
255         poll_ex.run_until(go(poll_source)).unwrap();
256     }
257 
258     #[test]
writevec()259     fn writevec() {
260         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
261             let v = vec![0x55u8; 32];
262             let v_ptr = v.as_ptr();
263             let ret = async_source.write_from_vec(0, v).await.unwrap();
264             assert_eq!(ret.0, 32);
265             let ret_v = ret.1;
266             assert_eq!(v_ptr, ret_v.as_ptr());
267         }
268 
269         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
270         let ex = URingExecutor::new().unwrap();
271         let uring_source = async_uring_from(f, &ex).unwrap();
272         ex.run_until(go(uring_source)).unwrap();
273 
274         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
275         let poll_ex = FdExecutor::new().unwrap();
276         let poll_source = async_poll_from(f, &poll_ex).unwrap();
277         poll_ex.run_until(go(poll_source)).unwrap();
278     }
279 
280     #[test]
readmem()281     fn readmem() {
282         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
283             let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
284             let ret = async_source
285                 .read_to_mem(
286                     0,
287                     Arc::<VecIoWrapper>::clone(&mem),
288                     &[
289                         MemRegion { offset: 0, len: 32 },
290                         MemRegion {
291                             offset: 200,
292                             len: 56,
293                         },
294                     ],
295                 )
296                 .await
297                 .unwrap();
298             assert_eq!(ret, 32 + 56);
299             let vec: Vec<u8> = match Arc::try_unwrap(mem) {
300                 Ok(v) => v.into(),
301                 Err(_) => panic!("Too many vec refs"),
302             };
303             assert!(vec.iter().take(32).all(|&b| b == 0));
304             assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55));
305             assert!(vec.iter().skip(200).take(56).all(|&b| b == 0));
306             assert!(vec.iter().skip(256).all(|&b| b == 0x55));
307         }
308 
309         let f = File::open("/dev/zero").unwrap();
310         let ex = URingExecutor::new().unwrap();
311         let uring_source = async_uring_from(f, &ex).unwrap();
312         ex.run_until(go(uring_source)).unwrap();
313 
314         let f = File::open("/dev/zero").unwrap();
315         let poll_ex = FdExecutor::new().unwrap();
316         let poll_source = async_poll_from(f, &poll_ex).unwrap();
317         poll_ex.run_until(go(poll_source)).unwrap();
318     }
319 
320     #[test]
writemem()321     fn writemem() {
322         async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
323             let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
324             let ret = async_source
325                 .write_from_mem(
326                     0,
327                     Arc::<VecIoWrapper>::clone(&mem),
328                     &[MemRegion { offset: 0, len: 32 }],
329                 )
330                 .await
331                 .unwrap();
332             assert_eq!(ret, 32);
333         }
334 
335         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
336         let ex = URingExecutor::new().unwrap();
337         let uring_source = async_uring_from(f, &ex).unwrap();
338         ex.run_until(go(uring_source)).unwrap();
339 
340         let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
341         let poll_ex = FdExecutor::new().unwrap();
342         let poll_source = async_poll_from(f, &poll_ex).unwrap();
343         poll_ex.run_until(go(poll_source)).unwrap();
344     }
345 
346     #[test]
read_u64s()347     fn read_u64s() {
348         async fn go(async_source: File, ex: URingExecutor) -> u64 {
349             let source = async_uring_from(async_source, &ex).unwrap();
350             source.read_u64().await.unwrap()
351         }
352 
353         let f = File::open("/dev/zero").unwrap();
354         let ex = URingExecutor::new().unwrap();
355         let val = ex.run_until(go(f, ex.clone())).unwrap();
356         assert_eq!(val, 0);
357     }
358 
359     #[test]
read_eventfds()360     fn read_eventfds() {
361         use sys_util::EventFd;
362 
363         async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) -> u64 {
364             source.read_u64().await.unwrap()
365         }
366 
367         let eventfd = EventFd::new().unwrap();
368         eventfd.write(0x55).unwrap();
369         let ex = URingExecutor::new().unwrap();
370         let uring_source = async_uring_from(eventfd, &ex).unwrap();
371         let val = ex.run_until(go(uring_source)).unwrap();
372         assert_eq!(val, 0x55);
373 
374         let eventfd = EventFd::new().unwrap();
375         eventfd.write(0xaa).unwrap();
376         let poll_ex = FdExecutor::new().unwrap();
377         let poll_source = async_poll_from(eventfd, &poll_ex).unwrap();
378         let val = poll_ex.run_until(go(poll_source)).unwrap();
379         assert_eq!(val, 0xaa);
380     }
381 
382     #[test]
fsync()383     fn fsync() {
384         async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) {
385             let v = vec![0x55u8; 32];
386             let v_ptr = v.as_ptr();
387             let ret = source.write_from_vec(0, v).await.unwrap();
388             assert_eq!(ret.0, 32);
389             let ret_v = ret.1;
390             assert_eq!(v_ptr, ret_v.as_ptr());
391             source.fsync().await.unwrap();
392         }
393 
394         let f = tempfile::tempfile().unwrap();
395         let ex = Executor::new().unwrap();
396         let source = ex.async_from(f).unwrap();
397 
398         ex.run_until(go(source)).unwrap();
399     }
400 }
401