1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::sync::mpsc::list;
6 use crate::sync::notify::Notify;
7 
8 use std::fmt;
9 use std::process;
10 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
11 use std::task::Poll::{Pending, Ready};
12 use std::task::{Context, Poll};
13 
14 /// Channel sender
15 pub(crate) struct Tx<T, S> {
16     inner: Arc<Chan<T, S>>,
17 }
18 
19 impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result20     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
21         fmt.debug_struct("Tx").field("inner", &self.inner).finish()
22     }
23 }
24 
25 /// Channel receiver
26 pub(crate) struct Rx<T, S: Semaphore> {
27     inner: Arc<Chan<T, S>>,
28 }
29 
30 impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result31     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
32         fmt.debug_struct("Rx").field("inner", &self.inner).finish()
33     }
34 }
35 
36 pub(crate) trait Semaphore {
is_idle(&self) -> bool37     fn is_idle(&self) -> bool;
38 
add_permit(&self)39     fn add_permit(&self);
40 
close(&self)41     fn close(&self);
42 
is_closed(&self) -> bool43     fn is_closed(&self) -> bool;
44 }
45 
46 struct Chan<T, S> {
47     /// Notifies all tasks listening for the receiver being dropped
48     notify_rx_closed: Notify,
49 
50     /// Handle to the push half of the lock-free list.
51     tx: list::Tx<T>,
52 
53     /// Coordinates access to channel's capacity.
54     semaphore: S,
55 
56     /// Receiver waker. Notified when a value is pushed into the channel.
57     rx_waker: AtomicWaker,
58 
59     /// Tracks the number of outstanding sender handles.
60     ///
61     /// When this drops to zero, the send half of the channel is closed.
62     tx_count: AtomicUsize,
63 
64     /// Only accessed by `Rx` handle.
65     rx_fields: UnsafeCell<RxFields<T>>,
66 }
67 
68 impl<T, S> fmt::Debug for Chan<T, S>
69 where
70     S: fmt::Debug,
71 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result72     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
73         fmt.debug_struct("Chan")
74             .field("tx", &self.tx)
75             .field("semaphore", &self.semaphore)
76             .field("rx_waker", &self.rx_waker)
77             .field("tx_count", &self.tx_count)
78             .field("rx_fields", &"...")
79             .finish()
80     }
81 }
82 
83 /// Fields only accessed by `Rx` handle.
84 struct RxFields<T> {
85     /// Channel receiver. This field is only accessed by the `Receiver` type.
86     list: list::Rx<T>,
87 
88     /// `true` if `Rx::close` is called.
89     rx_closed: bool,
90 }
91 
92 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result93     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
94         fmt.debug_struct("RxFields")
95             .field("list", &self.list)
96             .field("rx_closed", &self.rx_closed)
97             .finish()
98     }
99 }
100 
101 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
102 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
103 
channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)104 pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
105     let (tx, rx) = list::channel();
106 
107     let chan = Arc::new(Chan {
108         notify_rx_closed: Notify::new(),
109         tx,
110         semaphore,
111         rx_waker: AtomicWaker::new(),
112         tx_count: AtomicUsize::new(1),
113         rx_fields: UnsafeCell::new(RxFields {
114             list: rx,
115             rx_closed: false,
116         }),
117     });
118 
119     (Tx::new(chan.clone()), Rx::new(chan))
120 }
121 
122 // ===== impl Tx =====
123 
124 impl<T, S> Tx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>125     fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
126         Tx { inner: chan }
127     }
128 
semaphore(&self) -> &S129     pub(super) fn semaphore(&self) -> &S {
130         &self.inner.semaphore
131     }
132 
133     /// Send a message and notify the receiver.
send(&self, value: T)134     pub(crate) fn send(&self, value: T) {
135         self.inner.send(value);
136     }
137 
138     /// Wake the receive half
wake_rx(&self)139     pub(crate) fn wake_rx(&self) {
140         self.inner.rx_waker.wake();
141     }
142 
143     /// Returns `true` if senders belong to the same channel.
same_channel(&self, other: &Self) -> bool144     pub(crate) fn same_channel(&self, other: &Self) -> bool {
145         Arc::ptr_eq(&self.inner, &other.inner)
146     }
147 }
148 
149 impl<T, S: Semaphore> Tx<T, S> {
is_closed(&self) -> bool150     pub(crate) fn is_closed(&self) -> bool {
151         self.inner.semaphore.is_closed()
152     }
153 
closed(&self)154     pub(crate) async fn closed(&self) {
155         // In order to avoid a race condition, we first request a notification,
156         // **then** check whether the semaphore is closed. If the semaphore is
157         // closed the notification request is dropped.
158         let notified = self.inner.notify_rx_closed.notified();
159 
160         if self.inner.semaphore.is_closed() {
161             return;
162         }
163         notified.await;
164     }
165 }
166 
167 impl<T, S> Clone for Tx<T, S> {
clone(&self) -> Tx<T, S>168     fn clone(&self) -> Tx<T, S> {
169         // Using a Relaxed ordering here is sufficient as the caller holds a
170         // strong ref to `self`, preventing a concurrent decrement to zero.
171         self.inner.tx_count.fetch_add(1, Relaxed);
172 
173         Tx {
174             inner: self.inner.clone(),
175         }
176     }
177 }
178 
179 impl<T, S> Drop for Tx<T, S> {
drop(&mut self)180     fn drop(&mut self) {
181         if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
182             return;
183         }
184 
185         // Close the list, which sends a `Close` message
186         self.inner.tx.close();
187 
188         // Notify the receiver
189         self.wake_rx();
190     }
191 }
192 
193 // ===== impl Rx =====
194 
195 impl<T, S: Semaphore> Rx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>196     fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
197         Rx { inner: chan }
198     }
199 
close(&mut self)200     pub(crate) fn close(&mut self) {
201         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
202             let rx_fields = unsafe { &mut *rx_fields_ptr };
203 
204             if rx_fields.rx_closed {
205                 return;
206             }
207 
208             rx_fields.rx_closed = true;
209         });
210 
211         self.inner.semaphore.close();
212         self.inner.notify_rx_closed.notify_waiters();
213     }
214 
215     /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>216     pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
217         use super::block::Read::*;
218 
219         // Keep track of task budget
220         let coop = ready!(crate::coop::poll_proceed(cx));
221 
222         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
223             let rx_fields = unsafe { &mut *rx_fields_ptr };
224 
225             macro_rules! try_recv {
226                 () => {
227                     match rx_fields.list.pop(&self.inner.tx) {
228                         Some(Value(value)) => {
229                             self.inner.semaphore.add_permit();
230                             coop.made_progress();
231                             return Ready(Some(value));
232                         }
233                         Some(Closed) => {
234                             // TODO: This check may not be required as it most
235                             // likely can only return `true` at this point. A
236                             // channel is closed when all tx handles are
237                             // dropped. Dropping a tx handle releases memory,
238                             // which ensures that if dropping the tx handle is
239                             // visible, then all messages sent are also visible.
240                             assert!(self.inner.semaphore.is_idle());
241                             coop.made_progress();
242                             return Ready(None);
243                         }
244                         None => {} // fall through
245                     }
246                 };
247             }
248 
249             try_recv!();
250 
251             self.inner.rx_waker.register_by_ref(cx.waker());
252 
253             // It is possible that a value was pushed between attempting to read
254             // and registering the task, so we have to check the channel a
255             // second time here.
256             try_recv!();
257 
258             if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
259                 coop.made_progress();
260                 Ready(None)
261             } else {
262                 Pending
263             }
264         })
265     }
266 }
267 
268 impl<T, S: Semaphore> Drop for Rx<T, S> {
drop(&mut self)269     fn drop(&mut self) {
270         use super::block::Read::Value;
271 
272         self.close();
273 
274         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
275             let rx_fields = unsafe { &mut *rx_fields_ptr };
276 
277             while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
278                 self.inner.semaphore.add_permit();
279             }
280         })
281     }
282 }
283 
284 // ===== impl Chan =====
285 
286 impl<T, S> Chan<T, S> {
send(&self, value: T)287     fn send(&self, value: T) {
288         // Push the value
289         self.tx.push(value);
290 
291         // Notify the rx task
292         self.rx_waker.wake();
293     }
294 }
295 
296 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)297     fn drop(&mut self) {
298         use super::block::Read::Value;
299 
300         // Safety: the only owner of the rx fields is Chan, and eing
301         // inside its own Drop means we're the last ones to touch it.
302         self.rx_fields.with_mut(|rx_fields_ptr| {
303             let rx_fields = unsafe { &mut *rx_fields_ptr };
304 
305             while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
306             unsafe { rx_fields.list.free_blocks() };
307         });
308     }
309 }
310 
311 // ===== impl Semaphore for (::Semaphore, capacity) =====
312 
313 impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
add_permit(&self)314     fn add_permit(&self) {
315         self.0.release(1)
316     }
317 
is_idle(&self) -> bool318     fn is_idle(&self) -> bool {
319         self.0.available_permits() == self.1
320     }
321 
close(&self)322     fn close(&self) {
323         self.0.close();
324     }
325 
is_closed(&self) -> bool326     fn is_closed(&self) -> bool {
327         self.0.is_closed()
328     }
329 }
330 
331 // ===== impl Semaphore for AtomicUsize =====
332 
333 use std::sync::atomic::Ordering::{Acquire, Release};
334 use std::usize;
335 
336 impl Semaphore for AtomicUsize {
add_permit(&self)337     fn add_permit(&self) {
338         let prev = self.fetch_sub(2, Release);
339 
340         if prev >> 1 == 0 {
341             // Something went wrong
342             process::abort();
343         }
344     }
345 
is_idle(&self) -> bool346     fn is_idle(&self) -> bool {
347         self.load(Acquire) >> 1 == 0
348     }
349 
close(&self)350     fn close(&self) {
351         self.fetch_or(1, Release);
352     }
353 
is_closed(&self) -> bool354     fn is_closed(&self) -> bool {
355         self.load(Acquire) & 1 == 1
356     }
357 }
358