1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::future::Future;
6 
7 use async_task::Task;
8 
9 use crate::poll_source::Error as PollError;
10 use crate::uring_executor::use_uring;
11 use crate::{
12     AsyncResult, FdExecutor, IntoAsync, IoSourceExt, PollSource, URingExecutor, UringSource,
13 };
14 
async_uring_from<'a, F: IntoAsync + 'a>( f: F, ex: &URingExecutor, ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>>15 pub(crate) fn async_uring_from<'a, F: IntoAsync + 'a>(
16     f: F,
17     ex: &URingExecutor,
18 ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>> {
19     Ok(UringSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)?)
20 }
21 
22 /// Creates a concrete `IoSourceExt` using the fd_executor.
async_poll_from<'a, F: IntoAsync + 'a>( f: F, ex: &FdExecutor, ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>>23 pub(crate) fn async_poll_from<'a, F: IntoAsync + 'a>(
24     f: F,
25     ex: &FdExecutor,
26 ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>> {
27     Ok(PollSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)?)
28 }
29 
30 /// An executor for scheduling tasks that poll futures to completion.
31 ///
32 /// All asynchronous operations must run within an executor, which is capable of spawning futures as
33 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
34 ///
35 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
36 /// create a new reference, not a new executor.
37 ///
38 /// # Examples
39 ///
40 /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
41 ///
42 /// ```
43 /// use std::cmp::min;
44 /// use std::error::Error;
45 /// use std::fs::{File, OpenOptions};
46 ///
47 /// use cros_async::{AsyncResult, Executor, IoSourceExt, complete3};
48 /// const CHUNK_SIZE: usize = 32;
49 ///
50 /// // Write all bytes from `data` to `f`.
51 /// async fn write_file(f: &dyn IoSourceExt<File>, mut data: Vec<u8>) -> AsyncResult<()> {
52 ///     while data.len() > 0 {
53 ///         let (count, mut buf) = f.write_from_vec(0, data).await?;
54 ///
55 ///         data = buf.split_off(count);
56 ///     }
57 ///
58 ///     Ok(())
59 /// }
60 ///
61 /// // Transfer `len` bytes of data from `from` to `to`.
62 /// async fn transfer_data(
63 ///     from: Box<dyn IoSourceExt<File>>,
64 ///     to: Box<dyn IoSourceExt<File>>,
65 ///     len: usize,
66 /// ) -> AsyncResult<usize> {
67 ///     let mut rem = len;
68 ///
69 ///     while rem > 0 {
70 ///         let buf = vec![0u8; min(rem, CHUNK_SIZE)];
71 ///         let (count, mut data) = from.read_to_vec(0, buf).await?;
72 ///
73 ///         if count == 0 {
74 ///             // End of file. Return the number of bytes transferred.
75 ///             return Ok(len - rem);
76 ///         }
77 ///
78 ///         data.truncate(count);
79 ///         write_file(&*to, data).await?;
80 ///
81 ///         rem = rem.saturating_sub(count);
82 ///     }
83 ///
84 ///     Ok(len)
85 /// }
86 ///
87 /// # fn do_it() -> Result<(), Box<dyn Error>> {
88 ///     let ex = Executor::new()?;
89 ///
90 ///     let (rx, tx) = sys_util::pipe(true)?;
91 ///     let zero = File::open("/dev/zero")?;
92 ///     let zero_bytes = CHUNK_SIZE * 7;
93 ///     let zero_to_pipe = transfer_data(
94 ///         ex.async_from(zero)?,
95 ///         ex.async_from(tx.try_clone()?)?,
96 ///         zero_bytes,
97 ///     );
98 ///
99 ///     let rand = File::open("/dev/urandom")?;
100 ///     let rand_bytes = CHUNK_SIZE * 19;
101 ///     let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
102 ///
103 ///     let null = OpenOptions::new().write(true).open("/dev/null")?;
104 ///     let null_bytes = zero_bytes + rand_bytes;
105 ///     let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
106 ///
107 ///     ex.run_until(complete3(
108 ///         async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
109 ///         async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
110 ///         async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
111 ///     ))?;
112 ///
113 /// #     Ok(())
114 /// # }
115 ///
116 /// # do_it().unwrap();
117 /// ```
118 
119 #[derive(Clone)]
120 pub enum Executor {
121     Uring(URingExecutor),
122     Fd(FdExecutor),
123 }
124 
125 impl Executor {
126     /// Create a new `Executor`.
new() -> AsyncResult<Self>127     pub fn new() -> AsyncResult<Self> {
128         if use_uring() {
129             Ok(URingExecutor::new().map(Executor::Uring)?)
130         } else {
131             Ok(FdExecutor::new()
132                 .map(Executor::Fd)
133                 .map_err(PollError::Executor)?)
134         }
135     }
136 
137     /// Create a new `Box<dyn IoSourceExt<F>>` associated with `self`. Callers may then use the
138     /// returned `IoSourceExt` to directly start async operations without needing a separate
139     /// reference to the executor.
async_from<'a, F: IntoAsync + 'a>( &self, f: F, ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>>140     pub fn async_from<'a, F: IntoAsync + 'a>(
141         &self,
142         f: F,
143     ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>> {
144         match self {
145             Executor::Uring(ex) => async_uring_from(f, ex),
146             Executor::Fd(ex) => async_poll_from(f, ex),
147         }
148     }
149 
150     /// Spawn a new future for this executor to run to completion. Callers may use the returned
151     /// `Task` to await on the result of `f`. Dropping the returned `Task` will cancel `f`,
152     /// preventing it from being polled again. To drop a `Task` without canceling the future
153     /// associated with it use `Task::detach`. To cancel a task gracefully and wait until it is
154     /// fully destroyed, use `Task::cancel`.
155     ///
156     /// # Examples
157     ///
158     /// ```
159     /// # use cros_async::AsyncResult;
160     /// # fn example_spawn() -> AsyncResult<()> {
161     /// #      use std::thread;
162     ///
163     /// #      use cros_async::Executor;
164     ///       use futures::executor::block_on;
165     ///
166     /// #      let ex = Executor::new()?;
167     ///
168     /// #      // Spawn a thread that runs the executor.
169     /// #      let ex2 = ex.clone();
170     /// #      thread::spawn(move || ex2.run());
171     ///
172     ///       let task = ex.spawn(async { 7 + 13 });
173     ///
174     ///       let result = block_on(task);
175     ///       assert_eq!(result, 20);
176     /// #     Ok(())
177     /// # }
178     ///
179     /// # example_spawn().unwrap();
180     /// ```
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,181     pub fn spawn<F>(&self, f: F) -> Task<F::Output>
182     where
183         F: Future + Send + 'static,
184         F::Output: Send + 'static,
185     {
186         match self {
187             Executor::Uring(ex) => ex.spawn(f),
188             Executor::Fd(ex) => ex.spawn(f),
189         }
190     }
191 
192     /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
193     /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
194     /// thread where `run()` or `run_until()` is called.
195     ///
196     /// # Panics
197     ///
198     /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
199     /// added by calling `spawn_local` from a different thread.
200     ///
201     /// # Examples
202     ///
203     /// ```
204     /// # use cros_async::AsyncResult;
205     /// # fn example_spawn_local() -> AsyncResult<()> {
206     /// #      use cros_async::Executor;
207     ///
208     /// #      let ex = Executor::new()?;
209     ///
210     ///       let task = ex.spawn_local(async { 7 + 13 });
211     ///
212     ///       let result = ex.run_until(task)?;
213     ///       assert_eq!(result, 20);
214     /// #     Ok(())
215     /// # }
216     ///
217     /// # example_spawn_local().unwrap();
218     /// ```
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,219     pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
220     where
221         F: Future + 'static,
222         F::Output: 'static,
223     {
224         match self {
225             Executor::Uring(ex) => ex.spawn_local(f),
226             Executor::Fd(ex) => ex.spawn_local(f),
227         }
228     }
229 
230     /// Run the executor indefinitely, driving all spawned futures to completion. This method will
231     /// block the current thread and only return in the case of an error.
232     ///
233     /// # Panics
234     ///
235     /// Once this method has been called on a thread, it may only be called on that thread from that
236     /// point on. Attempting to call it from another thread will panic.
237     ///
238     /// # Examples
239     ///
240     /// ```
241     /// # use cros_async::AsyncResult;
242     /// # fn example_run() -> AsyncResult<()> {
243     ///       use std::thread;
244     ///
245     ///       use cros_async::Executor;
246     ///       use futures::executor::block_on;
247     ///
248     ///       let ex = Executor::new()?;
249     ///
250     ///       // Spawn a thread that runs the executor.
251     ///       let ex2 = ex.clone();
252     ///       thread::spawn(move || ex2.run());
253     ///
254     ///       let task = ex.spawn(async { 7 + 13 });
255     ///
256     ///       let result = block_on(task);
257     ///       assert_eq!(result, 20);
258     /// #     Ok(())
259     /// # }
260     ///
261     /// # example_run().unwrap();
262     /// ```
run(&self) -> AsyncResult<()>263     pub fn run(&self) -> AsyncResult<()> {
264         match self {
265             Executor::Uring(ex) => ex.run()?,
266             Executor::Fd(ex) => ex.run().map_err(PollError::Executor)?,
267         }
268 
269         Ok(())
270     }
271 
272     /// Drive all futures spawned in this executor until `f` completes. This method will block the
273     /// current thread only until `f` is complete and there may still be unfinished futures in the
274     /// executor.
275     ///
276     /// # Panics
277     ///
278     /// Once this method has been called on a thread, from then onwards it may only be called on
279     /// that thread. Attempting to call it from another thread will panic.
280     ///
281     /// # Examples
282     ///
283     /// ```
284     /// # use cros_async::AsyncResult;
285     /// # fn example_run_until() -> AsyncResult<()> {
286     ///       use cros_async::Executor;
287     ///
288     ///       let ex = Executor::new()?;
289     ///
290     ///       let task = ex.spawn_local(async { 7 + 13 });
291     ///
292     ///       let result = ex.run_until(task)?;
293     ///       assert_eq!(result, 20);
294     /// #     Ok(())
295     /// # }
296     ///
297     /// # example_run_until().unwrap();
298     /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>299     pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
300         match self {
301             Executor::Uring(ex) => Ok(ex.run_until(f)?),
302             Executor::Fd(ex) => Ok(ex.run_until(f).map_err(PollError::Executor)?),
303         }
304     }
305 }
306