1 //! A shutdown channel.
2 //!
3 //! Each worker holds the `Sender` half. When all the `Sender` halves are
4 //! dropped, the `Receiver` receives a notification.
5
6 use crate::loom::sync::Arc;
7 use crate::sync::oneshot;
8
9 use std::time::Duration;
10
11 #[derive(Debug, Clone)]
12 pub(super) struct Sender {
13 tx: Arc<oneshot::Sender<()>>,
14 }
15
16 #[derive(Debug)]
17 pub(super) struct Receiver {
18 rx: oneshot::Receiver<()>,
19 }
20
channel() -> (Sender, Receiver)21 pub(super) fn channel() -> (Sender, Receiver) {
22 let (tx, rx) = oneshot::channel();
23 let tx = Sender { tx: Arc::new(tx) };
24 let rx = Receiver { rx };
25
26 (tx, rx)
27 }
28
29 impl Receiver {
30 /// Blocks the current thread until all `Sender` handles drop.
31 ///
32 /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
33 /// duration. If `timeout` is `None`, then the thread is blocked until the
34 /// shutdown signal is received.
35 ///
36 /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
wait(&mut self, timeout: Option<Duration>) -> bool37 pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
38 use crate::runtime::enter::try_enter;
39
40 if timeout == Some(Duration::from_nanos(0)) {
41 return false;
42 }
43
44 let mut e = match try_enter(false) {
45 Some(enter) => enter,
46 _ => {
47 if std::thread::panicking() {
48 // Don't panic in a panic
49 return false;
50 } else {
51 panic!(
52 "Cannot drop a runtime in a context where blocking is not allowed. \
53 This happens when a runtime is dropped from within an asynchronous context."
54 );
55 }
56 }
57 };
58
59 // The oneshot completes with an Err
60 //
61 // If blocking fails to wait, this indicates a problem parking the
62 // current thread (usually, shutting down a runtime stored in a
63 // thread-local).
64 if let Some(timeout) = timeout {
65 e.block_on_timeout(&mut self.rx, timeout).is_ok()
66 } else {
67 let _ = e.block_on(&mut self.rx);
68 true
69 }
70 }
71 }
72