1 // Copyright 2020, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! This module implements the handling of async tasks.
16 //! The worker thread has a high priority and a low priority queue. Adding a job to either
17 //! will cause one thread to be spawned if none exists. As a compromise between performance
18 //! and resource consumption, the thread will linger for about 30 seconds after it has
19 //! processed all tasks before it terminates.
20 //! Note that low priority tasks are processed only when the high priority queue is empty.
21 
22 use std::{any::Any, any::TypeId, time::Duration};
23 use std::{
24     collections::{HashMap, VecDeque},
25     sync::Arc,
26     sync::{Condvar, Mutex, MutexGuard},
27     thread,
28 };
29 
30 #[derive(Debug, PartialEq, Eq)]
31 enum State {
32     Exiting,
33     Running,
34 }
35 
36 /// The Shelf allows async tasks to store state across invocations.
37 /// Note: Store elves at your own peril ;-).
38 #[derive(Debug, Default)]
39 pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
40 
41 impl Shelf {
42     /// Get a reference to the shelved data of type T. Returns Some if the data exists.
get_downcast_ref<T: Any + Send>(&self) -> Option<&T>43     pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
44         self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
45     }
46 
47     /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
48     /// get_mut, or get_or_put_with.
get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T>49     pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
50         self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
51     }
52 
53     /// Remove the entry of the given type and returns the stored data if it existed.
remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T>54     pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
55         self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
56     }
57 
58     /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
put<T: Any + Send>(&mut self, v: T) -> Option<T>59     pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
60         self.0
61             .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
62             .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
63     }
64 
65     /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
66     /// The type must implement Default.
get_mut<T: Any + Send + Default>(&mut self) -> &mut T67     pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
68         self.0
69             .entry(TypeId::of::<T>())
70             .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send>)
71             .downcast_mut::<T>()
72             .unwrap()
73     }
74 
75     /// Gets a mutable reference to the entry of the given type or creates it using the init
76     /// function. Init is not executed if the entry already existed.
get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T where F: FnOnce() -> T,77     pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T
78     where
79         F: FnOnce() -> T,
80     {
81         self.0
82             .entry(TypeId::of::<T>())
83             .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>)
84             .downcast_mut::<T>()
85             .unwrap()
86     }
87 }
88 
89 struct AsyncTaskState {
90     state: State,
91     thread: Option<thread::JoinHandle<()>>,
92     timeout: Duration,
93     hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
94     lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
95     idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>,
96     /// The store allows tasks to store state across invocations. It is passed to each invocation
97     /// of each task. Tasks need to cooperate on the ids they use for storing state.
98     shelf: Option<Shelf>,
99 }
100 
101 /// AsyncTask spawns one worker thread on demand to process jobs inserted into
102 /// a low and a high priority work queue. The queues are processed FIFO, and low
103 /// priority queue is processed if the high priority queue is empty.
104 /// Note: Because there is only one worker thread at a time for a given AsyncTask instance,
105 /// all scheduled requests are guaranteed to be serialized with respect to one another.
106 pub struct AsyncTask {
107     state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
108 }
109 
110 impl Default for AsyncTask {
default() -> Self111     fn default() -> Self {
112         Self::new(Duration::from_secs(30))
113     }
114 }
115 
116 impl AsyncTask {
117     /// Construct an [`AsyncTask`] with a specific timeout value.
new(timeout: Duration) -> Self118     pub fn new(timeout: Duration) -> Self {
119         Self {
120             state: Arc::new((
121                 Condvar::new(),
122                 Mutex::new(AsyncTaskState {
123                     state: State::Exiting,
124                     thread: None,
125                     timeout,
126                     hi_prio_req: VecDeque::new(),
127                     lo_prio_req: VecDeque::new(),
128                     idle_fns: Vec::new(),
129                     shelf: None,
130                 }),
131             )),
132         }
133     }
134 
135     /// Adds a one-off job to the high priority queue. High priority jobs are
136     /// completed before low priority jobs and can also overtake low priority
137     /// jobs. But they cannot preempt them.
queue_hi<F>(&self, f: F) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,138     pub fn queue_hi<F>(&self, f: F)
139     where
140         F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
141     {
142         self.queue(f, true)
143     }
144 
145     /// Adds a one-off job to the low priority queue. Low priority jobs are
146     /// completed after high priority. And they are not executed as long as high
147     /// priority jobs are present. Jobs always run to completion and are never
148     /// preempted by high priority jobs.
queue_lo<F>(&self, f: F) where F: FnOnce(&mut Shelf) + Send + 'static,149     pub fn queue_lo<F>(&self, f: F)
150     where
151         F: FnOnce(&mut Shelf) + Send + 'static,
152     {
153         self.queue(f, false)
154     }
155 
156     /// Adds an idle callback. This will be invoked whenever the worker becomes
157     /// idle (all high and low priority jobs have been performed).
add_idle<F>(&self, f: F) where F: Fn(&mut Shelf) + Send + Sync + 'static,158     pub fn add_idle<F>(&self, f: F)
159     where
160         F: Fn(&mut Shelf) + Send + Sync + 'static,
161     {
162         let (ref _condvar, ref state) = *self.state;
163         let mut state = state.lock().unwrap();
164         state.idle_fns.push(Arc::new(f));
165     }
166 
queue<F>(&self, f: F, hi_prio: bool) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,167     fn queue<F>(&self, f: F, hi_prio: bool)
168     where
169         F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
170     {
171         let (ref condvar, ref state) = *self.state;
172         let mut state = state.lock().unwrap();
173 
174         if hi_prio {
175             state.hi_prio_req.push_back(Box::new(f));
176         } else {
177             state.lo_prio_req.push_back(Box::new(f));
178         }
179 
180         if state.state != State::Running {
181             self.spawn_thread(&mut state);
182         }
183         drop(state);
184         condvar.notify_all();
185     }
186 
spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>)187     fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) {
188         if let Some(t) = state.thread.take() {
189             t.join().expect("AsyncTask panicked.");
190         }
191 
192         let cloned_state = self.state.clone();
193         let timeout_period = state.timeout;
194 
195         state.thread = Some(thread::spawn(move || {
196             let (ref condvar, ref state) = *cloned_state;
197 
198             enum Action {
199                 QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>),
200                 IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>),
201             }
202             let mut done_idle = false;
203 
204             // When the worker starts, it takes the shelf and puts it on the stack.
205             let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
206             loop {
207                 if let Some(action) = {
208                     let state = state.lock().unwrap();
209                     if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() {
210                         // No jobs queued so invoke the idle callbacks.
211                         Some(Action::IdleFns(state.idle_fns.clone()))
212                     } else {
213                         // Wait for either a queued job to arrive or a timeout.
214                         let (mut state, timeout) = condvar
215                             .wait_timeout_while(state, timeout_period, |state| {
216                                 state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty()
217                             })
218                             .unwrap();
219                         match (
220                             state.hi_prio_req.pop_front(),
221                             state.lo_prio_req.is_empty(),
222                             timeout.timed_out(),
223                         ) {
224                             (Some(f), _, _) => Some(Action::QueuedFn(f)),
225                             (None, false, _) => {
226                                 state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f))
227                             }
228                             (None, true, true) => {
229                                 // When the worker exits it puts the shelf back into the shared
230                                 // state for the next worker to use. So state is preserved not
231                                 // only across invocations but also across worker thread shut down.
232                                 state.shelf = Some(shelf);
233                                 state.state = State::Exiting;
234                                 break;
235                             }
236                             (None, true, false) => None,
237                         }
238                     }
239                 } {
240                     // Now that the lock has been dropped, perform the action.
241                     match action {
242                         Action::QueuedFn(f) => {
243                             f(&mut shelf);
244                             done_idle = false;
245                         }
246                         Action::IdleFns(idle_fns) => {
247                             for idle_fn in idle_fns {
248                                 idle_fn(&mut shelf);
249                             }
250                             done_idle = true;
251                         }
252                     }
253                 }
254             }
255         }));
256         state.state = State::Running;
257     }
258 }
259 
260 #[cfg(test)]
261 mod tests {
262     use super::{AsyncTask, Shelf};
263     use std::sync::{
264         mpsc::{channel, sync_channel, RecvTimeoutError},
265         Arc,
266     };
267     use std::time::Duration;
268 
269     #[test]
test_shelf()270     fn test_shelf() {
271         let mut shelf = Shelf::default();
272 
273         let s = "A string".to_string();
274         assert_eq!(shelf.put(s), None);
275 
276         let s2 = "Another string".to_string();
277         assert_eq!(shelf.put(s2), Some("A string".to_string()));
278 
279         // Put something of a different type on the shelf.
280         #[derive(Debug, PartialEq, Eq)]
281         struct Elf {
282             pub name: String,
283         }
284         let e1 = Elf { name: "Glorfindel".to_string() };
285         assert_eq!(shelf.put(e1), None);
286 
287         // The String value is still on the shelf.
288         let s3 = shelf.get_downcast_ref::<String>().unwrap();
289         assert_eq!(s3, "Another string");
290 
291         // As is the Elf.
292         {
293             let e2 = shelf.get_downcast_mut::<Elf>().unwrap();
294             assert_eq!(e2.name, "Glorfindel");
295             e2.name = "Celeborn".to_string();
296         }
297 
298         // Take the Elf off the shelf.
299         let e3 = shelf.remove_downcast_ref::<Elf>().unwrap();
300         assert_eq!(e3.name, "Celeborn");
301 
302         assert_eq!(shelf.remove_downcast_ref::<Elf>(), None);
303 
304         // No u64 value has been put on the shelf, so getting one gives the default value.
305         {
306             let i = shelf.get_mut::<u64>();
307             assert_eq!(*i, 0);
308             *i = 42;
309         }
310         let i2 = shelf.get_downcast_ref::<u64>().unwrap();
311         assert_eq!(*i2, 42);
312 
313         // No i32 value has ever been seen near the shelf.
314         assert_eq!(shelf.get_downcast_ref::<i32>(), None);
315         assert_eq!(shelf.get_downcast_mut::<i32>(), None);
316         assert_eq!(shelf.remove_downcast_ref::<i32>(), None);
317     }
318 
319     #[test]
test_async_task()320     fn test_async_task() {
321         let at = AsyncTask::default();
322 
323         // First queue up a job that blocks until we release it, to avoid
324         // unpredictable synchronization.
325         let (start_sender, start_receiver) = channel();
326         at.queue_hi(move |shelf| {
327             start_receiver.recv().unwrap();
328             // Put a trace vector on the shelf
329             shelf.put(Vec::<String>::new());
330         });
331 
332         // Queue up some high-priority and low-priority jobs.
333         for i in 0..3 {
334             let j = i;
335             at.queue_lo(move |shelf| {
336                 let trace = shelf.get_mut::<Vec<String>>();
337                 trace.push(format!("L{}", j));
338             });
339             let j = i;
340             at.queue_hi(move |shelf| {
341                 let trace = shelf.get_mut::<Vec<String>>();
342                 trace.push(format!("H{}", j));
343             });
344         }
345 
346         // Finally queue up a low priority job that emits the trace.
347         let (trace_sender, trace_receiver) = channel();
348         at.queue_lo(move |shelf| {
349             let trace = shelf.get_downcast_ref::<Vec<String>>().unwrap();
350             trace_sender.send(trace.clone()).unwrap();
351         });
352 
353         // Ready, set, go.
354         start_sender.send(()).unwrap();
355         let trace = trace_receiver.recv().unwrap();
356 
357         assert_eq!(trace, vec!["H0", "H1", "H2", "L0", "L1", "L2"]);
358     }
359 
360     #[test]
test_async_task_chain()361     fn test_async_task_chain() {
362         let at = Arc::new(AsyncTask::default());
363         let (sender, receiver) = channel();
364         // Queue up a job that will queue up another job. This confirms
365         // that the job is not invoked with any internal AsyncTask locks held.
366         let at_clone = at.clone();
367         at.queue_hi(move |_shelf| {
368             at_clone.queue_lo(move |_shelf| {
369                 sender.send(()).unwrap();
370             });
371         });
372         receiver.recv().unwrap();
373     }
374 
375     #[test]
376     #[should_panic]
test_async_task_panic()377     fn test_async_task_panic() {
378         let at = AsyncTask::default();
379         at.queue_hi(|_shelf| {
380             panic!("Panic from queued job");
381         });
382         // Queue another job afterwards to ensure that the async thread gets joined.
383         let (done_sender, done_receiver) = channel();
384         at.queue_hi(move |_shelf| {
385             done_sender.send(()).unwrap();
386         });
387         done_receiver.recv().unwrap();
388     }
389 
390     #[test]
test_async_task_idle()391     fn test_async_task_idle() {
392         let at = AsyncTask::new(Duration::from_secs(3));
393         // Need a SyncSender as it is Send+Sync.
394         let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3);
395         at.add_idle(move |_shelf| {
396             idle_done_sender.send(()).unwrap();
397         });
398 
399         // Queue up some high-priority and low-priority jobs that take time.
400         for _i in 0..3 {
401             at.queue_lo(|_shelf| {
402                 std::thread::sleep(Duration::from_millis(500));
403             });
404             at.queue_hi(|_shelf| {
405                 std::thread::sleep(Duration::from_millis(500));
406             });
407         }
408         // Final low-priority job.
409         let (done_sender, done_receiver) = channel();
410         at.queue_lo(move |_shelf| {
411             done_sender.send(()).unwrap();
412         });
413 
414         // Nothing happens until the last job completes.
415         assert_eq!(
416             idle_done_receiver.recv_timeout(Duration::from_secs(1)),
417             Err(RecvTimeoutError::Timeout)
418         );
419         done_receiver.recv().unwrap();
420         idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
421 
422         // Idle callback not executed again even if we wait for a while.
423         assert_eq!(
424             idle_done_receiver.recv_timeout(Duration::from_secs(3)),
425             Err(RecvTimeoutError::Timeout)
426         );
427 
428         // However, if more work is done then there's another chance to go idle.
429         let (done_sender, done_receiver) = channel();
430         at.queue_hi(move |_shelf| {
431             std::thread::sleep(Duration::from_millis(500));
432             done_sender.send(()).unwrap();
433         });
434         // Idle callback not immediately executed, because the high priority
435         // job is taking a while.
436         assert_eq!(
437             idle_done_receiver.recv_timeout(Duration::from_millis(1)),
438             Err(RecvTimeoutError::Timeout)
439         );
440         done_receiver.recv().unwrap();
441         idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
442     }
443 
444     #[test]
test_async_task_multiple_idle()445     fn test_async_task_multiple_idle() {
446         let at = AsyncTask::new(Duration::from_secs(3));
447         let (idle_sender, idle_receiver) = sync_channel::<i32>(5);
448         // Queue a high priority job to start things off
449         at.queue_hi(|_shelf| {
450             std::thread::sleep(Duration::from_millis(500));
451         });
452 
453         // Multiple idle callbacks.
454         for i in 0..3 {
455             let idle_sender = idle_sender.clone();
456             at.add_idle(move |_shelf| {
457                 idle_sender.send(i).unwrap();
458             });
459         }
460 
461         // Nothing happens immediately.
462         assert_eq!(
463             idle_receiver.recv_timeout(Duration::from_millis(1)),
464             Err(RecvTimeoutError::Timeout)
465         );
466         // Wait for a moment and the idle jobs should have run.
467         std::thread::sleep(Duration::from_secs(1));
468 
469         let mut results = Vec::new();
470         while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) {
471             results.push(i);
472         }
473         assert_eq!(results, [0, 1, 2]);
474     }
475 
476     #[test]
test_async_task_idle_queues_job()477     fn test_async_task_idle_queues_job() {
478         let at = Arc::new(AsyncTask::new(Duration::from_secs(1)));
479         let at_clone = at.clone();
480         let (idle_sender, idle_receiver) = sync_channel::<i32>(100);
481         // Add an idle callback that queues a low-priority job.
482         at.add_idle(move |shelf| {
483             at_clone.queue_lo(|_shelf| {
484                 // Slow things down so the channel doesn't fill up.
485                 std::thread::sleep(Duration::from_millis(50));
486             });
487             let i = shelf.get_mut::<i32>();
488             idle_sender.send(*i).unwrap();
489             *i += 1;
490         });
491 
492         // Nothing happens immediately.
493         assert_eq!(
494             idle_receiver.recv_timeout(Duration::from_millis(1500)),
495             Err(RecvTimeoutError::Timeout)
496         );
497 
498         // Once we queue a normal job, things start.
499         at.queue_hi(|_shelf| {});
500         assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap());
501 
502         // The idle callback queues a job, and completion of that job
503         // means the task is going idle again...so the idle callback will
504         // be called repeatedly.
505         assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
506         assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
507         assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
508     }
509 
510     #[test]
511     #[should_panic]
test_async_task_idle_panic()512     fn test_async_task_idle_panic() {
513         let at = AsyncTask::new(Duration::from_secs(1));
514         let (idle_sender, idle_receiver) = sync_channel::<()>(3);
515         // Add an idle callback that panics.
516         at.add_idle(move |_shelf| {
517             idle_sender.send(()).unwrap();
518             panic!("Panic from idle callback");
519         });
520         // Queue a job to trigger idleness and ensuing panic.
521         at.queue_hi(|_shelf| {});
522         idle_receiver.recv().unwrap();
523 
524         // Queue another job afterwards to ensure that the async thread gets joined
525         // and the panic detected.
526         let (done_sender, done_receiver) = channel();
527         at.queue_hi(move |_shelf| {
528             done_sender.send(()).unwrap();
529         });
530         done_receiver.recv().unwrap();
531     }
532 }
533