1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! An Executor and future combinators based on operations that block on file descriptors.
6 //!
7 //! This crate is meant to be used with the `futures-rs` crate that provides further combinators
8 //! and utility functions to combine and manage futures. All futures will run until they block on a
9 //! file descriptor becoming readable or writable. Facilities are provided to register future
10 //! wakers based on such events.
11 //!
12 //! # Running top-level futures.
13 //!
14 //! Use helper functions based the desired behavior of your application.
15 //!
16 //! ## Running one future.
17 //!
18 //! If there is only one top-level future to run, use the [`run_one`](fn.run_one.html) function.
19 //!
20 //! ## Completing one of several futures.
21 //!
22 //! If there are several top level tasks that should run until any one completes, use the "select"
23 //! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
24 //! function will return when the first future completes. The uncompleted futures will also be
25 //! returned so they can be run further or otherwise cleaned up. These functions are inspired by
26 //! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
27 //! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
28 //! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
29 //!
30 //! ## Completing all of several futures.
31 //!
32 //! If there are several top level tasks that all need to be completed, use the "complete" family
33 //! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
34 //! function will return only once all the futures passed to it have completed. These functions are
35 //! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
36 //! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
37 //! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
38 //! [`complete5`](fn.complete5.html).
39 //!
40 //! # Implementing new FD-based futures.
41 //!
42 //! For URing implementations should provide an implementation of the `IoSource` trait.
43 //! For the FD executor, new futures can use the existing ability to poll a source to build async
44 //! functionality on top of.
45 //!
46 //! # Implementations
47 //!
48 //! Currently there are two paths for using the asynchronous IO. One uses a PollContext and drivers
49 //! futures based on the FDs signaling they are ready for the opteration. This method will exist so
50 //! long as kernels < 5.4 are supported.
51 //! The other method submits operations to io_uring and is signaled when they complete. This is more
52 //! efficient, but only supported on kernel 5.4+.
53 //! If `IoSourceExt::new` is used to interface with async IO, then the correct backend will be chosen
54 //! automatically.
55 //!
56 //! # Examples
57 //!
58 //! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if
59 //! all systems have support for io_uring.
60 
61 mod complete;
62 mod event;
63 mod executor;
64 mod fd_executor;
65 mod io_ext;
66 pub mod mem;
67 mod poll_source;
68 mod queue;
69 mod select;
70 pub mod sync;
71 mod timer;
72 mod uring_executor;
73 mod uring_source;
74 mod waker;
75 
76 pub use event::EventAsync;
77 pub use executor::Executor;
78 pub use fd_executor::FdExecutor;
79 pub use io_ext::{
80     Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult, WriteAsync,
81 };
82 pub use mem::{BackingMemory, MemRegion};
83 pub use poll_source::PollSource;
84 pub use select::SelectResult;
85 pub use timer::TimerAsync;
86 pub use uring_executor::URingExecutor;
87 pub use uring_source::UringSource;
88 
89 use std::future::Future;
90 use std::marker::PhantomData;
91 use std::pin::Pin;
92 use std::task::{Context, Poll};
93 
94 use thiserror::Error as ThisError;
95 
96 #[derive(ThisError, Debug)]
97 pub enum Error {
98     /// Error from the FD executor.
99     #[error("Failure in the FD executor: {0}")]
100     FdExecutor(fd_executor::Error),
101     /// Error from the uring executor.
102     #[error("Failure in the uring executor: {0}")]
103     URingExecutor(uring_executor::Error),
104 }
105 pub type Result<T> = std::result::Result<T, Error>;
106 
107 // A Future that never completes.
108 pub struct Empty<T> {
109     phantom: PhantomData<T>,
110 }
111 
112 impl<T> Future for Empty<T> {
113     type Output = T;
114 
poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T>115     fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T> {
116         Poll::Pending
117     }
118 }
119 
empty<T>() -> Empty<T>120 pub fn empty<T>() -> Empty<T> {
121     Empty {
122         phantom: PhantomData,
123     }
124 }
125 
126 /// Creates an Executor that runs one future to completion.
127 ///
128 ///  # Example
129 ///
130 ///    ```
131 ///    use cros_async::run_one;
132 ///
133 ///    let fut = async { 55 };
134 ///    assert_eq!(55, run_one(fut).unwrap());
135 ///    ```
run_one<F: Future>(fut: F) -> Result<F::Output>136 pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
137     if uring_executor::use_uring() {
138         run_one_uring(fut)
139     } else {
140         run_one_poll(fut)
141     }
142 }
143 
144 /// Creates a URingExecutor that runs one future to completion.
145 ///
146 ///  # Example
147 ///
148 ///    ```
149 ///    use cros_async::run_one_uring;
150 ///
151 ///    let fut = async { 55 };
152 ///    assert_eq!(55, run_one_uring(fut).unwrap());
153 ///    ```
run_one_uring<F: Future>(fut: F) -> Result<F::Output>154 pub fn run_one_uring<F: Future>(fut: F) -> Result<F::Output> {
155     URingExecutor::new()
156         .and_then(|ex| ex.run_until(fut))
157         .map_err(Error::URingExecutor)
158 }
159 
160 /// Creates a FdExecutor that runs one future to completion.
161 ///
162 ///  # Example
163 ///
164 ///    ```
165 ///    use cros_async::run_one_poll;
166 ///
167 ///    let fut = async { 55 };
168 ///    assert_eq!(55, run_one_poll(fut).unwrap());
169 ///    ```
run_one_poll<F: Future>(fut: F) -> Result<F::Output>170 pub fn run_one_poll<F: Future>(fut: F) -> Result<F::Output> {
171     FdExecutor::new()
172         .and_then(|ex| ex.run_until(fut))
173         .map_err(Error::FdExecutor)
174 }
175 
176 // Select helpers to run until any future completes.
177 
178 /// Creates a combinator that runs the two given futures until one completes, returning a tuple
179 /// containing the result of the finished future and the still pending future.
180 ///
181 ///  # Example
182 ///
183 ///    ```
184 ///    use cros_async::{SelectResult, select2, run_one};
185 ///    use futures::future::pending;
186 ///    use futures::pin_mut;
187 ///
188 ///    let first = async {5};
189 ///    let second = async {let () = pending().await;};
190 ///    pin_mut!(first);
191 ///    pin_mut!(second);
192 ///    match run_one(select2(first, second)) {
193 ///        Ok((SelectResult::Finished(5), SelectResult::Pending(_second))) => (),
194 ///        _ => panic!("Select didn't return the first future"),
195 ///    };
196 ///    ```
select2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> (SelectResult<F1>, SelectResult<F2>)197 pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
198     f1: F1,
199     f2: F2,
200 ) -> (SelectResult<F1>, SelectResult<F2>) {
201     select::Select2::new(f1, f2).await
202 }
203 
204 /// Creates a combinator that runs the three given futures until one or more completes, returning a
205 /// tuple containing the result of the finished future(s) and the still pending future(s).
206 ///
207 ///  # Example
208 ///
209 ///    ```
210 ///    use cros_async::{SelectResult, select3, run_one};
211 ///    use futures::future::pending;
212 ///    use futures::pin_mut;
213 ///
214 ///    let first = async {4};
215 ///    let second = async {let () = pending().await;};
216 ///    let third = async {5};
217 ///    pin_mut!(first);
218 ///    pin_mut!(second);
219 ///    pin_mut!(third);
220 ///    match run_one(select3(first, second, third)) {
221 ///        Ok((SelectResult::Finished(4),
222 ///            SelectResult::Pending(_second),
223 ///            SelectResult::Finished(5))) => (),
224 ///        _ => panic!("Select didn't return the futures"),
225 ///    };
226 ///    ```
select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f1: F1, f2: F2, f3: F3, ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)227 pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
228     f1: F1,
229     f2: F2,
230     f3: F3,
231 ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
232     select::Select3::new(f1, f2, f3).await
233 }
234 
235 /// Creates a combinator that runs the four given futures until one or more completes, returning a
236 /// tuple containing the result of the finished future(s) and the still pending future(s).
237 ///
238 ///  # Example
239 ///
240 ///    ```
241 ///    use cros_async::{SelectResult, select4, run_one};
242 ///    use futures::future::pending;
243 ///    use futures::pin_mut;
244 ///
245 ///    let first = async {4};
246 ///    let second = async {let () = pending().await;};
247 ///    let third = async {5};
248 ///    let fourth = async {let () = pending().await;};
249 ///    pin_mut!(first);
250 ///    pin_mut!(second);
251 ///    pin_mut!(third);
252 ///    pin_mut!(fourth);
253 ///    match run_one(select4(first, second, third, fourth)) {
254 ///        Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
255 ///            SelectResult::Finished(5), SelectResult::Pending(_fourth))) => (),
256 ///        _ => panic!("Select didn't return the futures"),
257 ///    };
258 ///    ```
select4< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, )259 pub async fn select4<
260     F1: Future + Unpin,
261     F2: Future + Unpin,
262     F3: Future + Unpin,
263     F4: Future + Unpin,
264 >(
265     f1: F1,
266     f2: F2,
267     f3: F3,
268     f4: F4,
269 ) -> (
270     SelectResult<F1>,
271     SelectResult<F2>,
272     SelectResult<F3>,
273     SelectResult<F4>,
274 ) {
275     select::Select4::new(f1, f2, f3, f4).await
276 }
277 
278 /// Creates a combinator that runs the five given futures until one or more completes, returning a
279 /// tuple containing the result of the finished future(s) and the still pending future(s).
280 ///
281 ///  # Example
282 ///
283 ///    ```
284 ///    use cros_async::{SelectResult, select5, run_one};
285 ///    use futures::future::pending;
286 ///    use futures::pin_mut;
287 ///
288 ///    let first = async {4};
289 ///    let second = async {let () = pending().await;};
290 ///    let third = async {5};
291 ///    let fourth = async {let () = pending().await;};
292 ///    let fifth = async {6};
293 ///    pin_mut!(first);
294 ///    pin_mut!(second);
295 ///    pin_mut!(third);
296 ///    pin_mut!(fourth);
297 ///    pin_mut!(fifth);
298 ///    match run_one(select5(first, second, third, fourth, fifth)) {
299 ///        Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
300 ///            SelectResult::Finished(5), SelectResult::Pending(_fourth),
301 ///            SelectResult::Finished(6))) => (),
302 ///        _ => panic!("Select didn't return the futures"),
303 ///    };
304 ///    ```
select5< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, )305 pub async fn select5<
306     F1: Future + Unpin,
307     F2: Future + Unpin,
308     F3: Future + Unpin,
309     F4: Future + Unpin,
310     F5: Future + Unpin,
311 >(
312     f1: F1,
313     f2: F2,
314     f3: F3,
315     f4: F4,
316     f5: F5,
317 ) -> (
318     SelectResult<F1>,
319     SelectResult<F2>,
320     SelectResult<F3>,
321     SelectResult<F4>,
322     SelectResult<F5>,
323 ) {
324     select::Select5::new(f1, f2, f3, f4, f5).await
325 }
326 
327 /// Creates a combinator that runs the six given futures until one or more completes, returning a
328 /// tuple containing the result of the finished future(s) and the still pending future(s).
329 ///
330 ///  # Example
331 ///
332 ///    ```
333 ///    use cros_async::{SelectResult, select6, run_one};
334 ///    use futures::future::pending;
335 ///    use futures::pin_mut;
336 ///
337 ///    let first = async {1};
338 ///    let second = async {let () = pending().await;};
339 ///    let third = async {3};
340 ///    let fourth = async {let () = pending().await;};
341 ///    let fifth = async {5};
342 ///    let sixth = async {6};
343 ///    pin_mut!(first);
344 ///    pin_mut!(second);
345 ///    pin_mut!(third);
346 ///    pin_mut!(fourth);
347 ///    pin_mut!(fifth);
348 ///    pin_mut!(sixth);
349 ///    match run_one(select6(first, second, third, fourth, fifth, sixth)) {
350 ///        Ok((SelectResult::Finished(1), SelectResult::Pending(_second),
351 ///            SelectResult::Finished(3), SelectResult::Pending(_fourth),
352 ///            SelectResult::Finished(5), SelectResult::Finished(6))) => (),
353 ///        _ => panic!("Select didn't return the futures"),
354 ///    };
355 ///    ```
select6< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, F6: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, f6: F6, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, SelectResult<F6>, )356 pub async fn select6<
357     F1: Future + Unpin,
358     F2: Future + Unpin,
359     F3: Future + Unpin,
360     F4: Future + Unpin,
361     F5: Future + Unpin,
362     F6: Future + Unpin,
363 >(
364     f1: F1,
365     f2: F2,
366     f3: F3,
367     f4: F4,
368     f5: F5,
369     f6: F6,
370 ) -> (
371     SelectResult<F1>,
372     SelectResult<F2>,
373     SelectResult<F3>,
374     SelectResult<F4>,
375     SelectResult<F5>,
376     SelectResult<F6>,
377 ) {
378     select::Select6::new(f1, f2, f3, f4, f5, f6).await
379 }
380 
381 // Combination helpers to run until all futures are complete.
382 
383 /// Creates a combinator that runs the two given futures to completion, returning a tuple of the
384 /// outputs each yields.
385 ///
386 ///  # Example
387 ///
388 ///    ```
389 ///    use cros_async::{complete2, run_one};
390 ///
391 ///    let first = async {5};
392 ///    let second = async {6};
393 ///    assert_eq!(run_one(complete2(first, second)).unwrap_or((0,0)), (5,6));
394 ///    ```
complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output) where F1: Future, F2: Future,395 pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
396 where
397     F1: Future,
398     F2: Future,
399 {
400     complete::Complete2::new(f1, f2).await
401 }
402 
403 /// Creates a combinator that runs the three given futures to completion, returning a tuple of the
404 /// outputs each yields.
405 ///
406 ///  # Example
407 ///
408 ///    ```
409 ///    use cros_async::{complete3, run_one};
410 ///
411 ///    let first = async {5};
412 ///    let second = async {6};
413 ///    let third = async {7};
414 ///    assert_eq!(run_one(complete3(first, second, third)).unwrap_or((0,0,0)), (5,6,7));
415 ///    ```
complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output) where F1: Future, F2: Future, F3: Future,416 pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
417 where
418     F1: Future,
419     F2: Future,
420     F3: Future,
421 {
422     complete::Complete3::new(f1, f2, f3).await
423 }
424 
425 /// Creates a combinator that runs the four given futures to completion, returning a tuple of the
426 /// outputs each yields.
427 ///
428 ///  # Example
429 ///
430 ///    ```
431 ///    use cros_async::{complete4, run_one};
432 ///
433 ///    let first = async {5};
434 ///    let second = async {6};
435 ///    let third = async {7};
436 ///    let fourth = async {8};
437 ///    assert_eq!(run_one(complete4(first, second, third, fourth)).unwrap_or((0,0,0,0)), (5,6,7,8));
438 ///    ```
complete4<F1, F2, F3, F4>( f1: F1, f2: F2, f3: F3, f4: F4, ) -> (F1::Output, F2::Output, F3::Output, F4::Output) where F1: Future, F2: Future, F3: Future, F4: Future,439 pub async fn complete4<F1, F2, F3, F4>(
440     f1: F1,
441     f2: F2,
442     f3: F3,
443     f4: F4,
444 ) -> (F1::Output, F2::Output, F3::Output, F4::Output)
445 where
446     F1: Future,
447     F2: Future,
448     F3: Future,
449     F4: Future,
450 {
451     complete::Complete4::new(f1, f2, f3, f4).await
452 }
453 
454 /// Creates a combinator that runs the five given futures to completion, returning a tuple of the
455 /// outputs each yields.
456 ///
457 ///  # Example
458 ///
459 ///    ```
460 ///    use cros_async::{complete5, run_one};
461 ///
462 ///    let first = async {5};
463 ///    let second = async {6};
464 ///    let third = async {7};
465 ///    let fourth = async {8};
466 ///    let fifth = async {9};
467 ///    assert_eq!(run_one(complete5(first, second, third, fourth, fifth)).unwrap_or((0,0,0,0,0)),
468 ///               (5,6,7,8,9));
469 ///    ```
complete5<F1, F2, F3, F4, F5>( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output) where F1: Future, F2: Future, F3: Future, F4: Future, F5: Future,470 pub async fn complete5<F1, F2, F3, F4, F5>(
471     f1: F1,
472     f2: F2,
473     f3: F3,
474     f4: F4,
475     f5: F5,
476 ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
477 where
478     F1: Future,
479     F2: Future,
480     F3: Future,
481     F4: Future,
482     F5: Future,
483 {
484     complete::Complete5::new(f1, f2, f3, f4, f5).await
485 }
486