use futures::channel::oneshot; use futures::executor::LocalPool; use futures::future::{self, Future, lazy, poll_fn}; use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; use std::thread; use std::time::Duration; use std::sync::Arc; use std::sync::atomic::{Ordering, AtomicBool}; struct Pending(Rc<()>); impl Future for Pending { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } fn pending() -> Pending { Pending(Rc::new(())) } #[test] fn run_until_single_future() { let mut cnt = 0; { let mut pool = LocalPool::new(); let fut = lazy(|_| { cnt += 1; }); pool.run_until(fut); } assert_eq!(cnt, 1); } #[test] fn run_until_ignores_spawned() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); pool.run_until(lazy(|_| ())); } #[test] fn run_until_executes_spawned() { let (tx, rx) = oneshot::channel(); let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { tx.send(()).unwrap(); })).into()).unwrap(); pool.run_until(rx).unwrap(); } #[test] fn run_returns_if_empty() { let mut pool = LocalPool::new(); pool.run(); pool.run(); } #[test] fn run_executes_spawned() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); let mut pool = LocalPool::new(); let spawn = pool.spawner(); let spawn2 = pool.spawner(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { spawn2.spawn_local_obj(Box::pin(lazy(move |_| { cnt2.set(cnt2.get() + 1); })).into()).unwrap(); })).into()).unwrap(); pool.run(); assert_eq!(cnt.get(), 1); } #[test] fn run_spawn_many() { const ITER: usize = 200; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); for _ in 0..ITER { let cnt = cnt.clone(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { cnt.set(cnt.get() + 1); })).into()).unwrap(); } pool.run(); assert_eq!(cnt.get(), ITER); } #[test] fn try_run_one_returns_if_empty() { let mut pool = LocalPool::new(); assert!(!pool.try_run_one()); } #[test] fn try_run_one_executes_one_ready() { const ITER: usize = 200; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); for _ in 0..ITER { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); let cnt = cnt.clone(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { cnt.set(cnt.get() + 1); })).into()).unwrap(); spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); } for i in 0..ITER { assert_eq!(cnt.get(), i); assert!(pool.try_run_one()); assert_eq!(cnt.get(), i + 1); } assert!(!pool.try_run_one()); } #[test] fn try_run_one_returns_on_no_progress() { const ITER: usize = 10; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); let waker: Rc>> = Rc::new(Cell::new(None)); { let cnt = cnt.clone(); let waker = waker.clone(); spawn.spawn_local_obj(Box::pin(poll_fn(move |ctx| { cnt.set(cnt.get() + 1); waker.set(Some(ctx.waker().clone())); if cnt.get() == ITER { Poll::Ready(()) } else { Poll::Pending } })).into()).unwrap(); } for i in 0..ITER - 1 { assert_eq!(cnt.get(), i); assert!(!pool.try_run_one()); assert_eq!(cnt.get(), i + 1); let w = waker.take(); assert!(w.is_some()); w.unwrap().wake(); } assert!(pool.try_run_one()); assert_eq!(cnt.get(), ITER); } #[test] fn try_run_one_runs_sub_futures() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); spawn.spawn_local_obj(Box::pin(poll_fn(move |_| { cnt1.set(cnt1.get() + 1); let cnt2 = cnt1.clone(); inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap(); Poll::Pending })).into()).unwrap(); pool.try_run_one(); assert_eq!(cnt.get(), 2); } #[test] fn run_until_stalled_returns_if_empty() { let mut pool = LocalPool::new(); pool.run_until_stalled(); pool.run_until_stalled(); } #[test] fn run_until_stalled_returns_multiple_times() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let cnt1 = cnt.clone(); spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt1.set(cnt1.get() + 1) })).into()).unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 1); let cnt2 = cnt.clone(); spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 2); } #[test] fn run_until_stalled_runs_spawned_sub_futures() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); spawn.spawn_local_obj(Box::pin(poll_fn(move |_| { cnt1.set(cnt1.get() + 1); let cnt2 = cnt1.clone(); inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap(); Poll::Pending })).into()).unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 2); } #[test] fn run_until_stalled_executes_all_ready() { const ITER: usize = 200; const PER_ITER: usize = 3; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); for i in 0..ITER { for _ in 0..PER_ITER { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); let cnt = cnt.clone(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { cnt.set(cnt.get() + 1); })).into()).unwrap(); // also add some pending tasks to test if they are ignored spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); } assert_eq!(cnt.get(), i * PER_ITER); pool.run_until_stalled(); assert_eq!(cnt.get(), (i + 1) * PER_ITER); } } #[test] #[should_panic] fn nesting_run() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn.spawn_obj(Box::pin(lazy(|_| { let mut pool = LocalPool::new(); pool.run(); })).into()).unwrap(); pool.run(); } #[test] #[should_panic] fn nesting_run_run_until_stalled() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn.spawn_obj(Box::pin(lazy(|_| { let mut pool = LocalPool::new(); pool.run_until_stalled(); })).into()).unwrap(); pool.run(); } #[test] fn tasks_are_scheduled_fairly() { let state = Rc::new(RefCell::new([0, 0])); struct Spin { state: Rc>, idx: usize, } impl Future for Spin { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut state = self.state.borrow_mut(); if self.idx == 0 { let diff = state[0] - state[1]; assert!(diff.abs() <= 1); if state[0] >= 50 { return Poll::Ready(()); } } state[self.idx] += 1; if state[self.idx] >= 100 { return Poll::Ready(()); } cx.waker().wake_by_ref(); Poll::Pending } } let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0, }).into()).unwrap(); spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1, }).into()).unwrap(); pool.run(); } // Tests that the use of park/unpark in user-code has no // effect on the expected behaviour of the executor. #[test] fn park_unpark_independence() { let mut done = false; let future = future::poll_fn(move |cx| { if done { return Poll::Ready(()) } done = true; cx.waker().clone().wake(); // (*) // some user-code that temporarily parks the thread let test = thread::current(); let latch = Arc::new(AtomicBool::new(false)); let signal = latch.clone(); thread::spawn(move || { thread::sleep(Duration::from_millis(10)); signal.store(true, Ordering::SeqCst); test.unpark() }); while !latch.load(Ordering::Relaxed) { thread::park(); } Poll::Pending // Expect to be called again due to (*). }); futures::executor::block_on(future) }