1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40 
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80 
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::{Context, Poll, Waker};
83 use futures_core::task::__internal::AtomicWaker;
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::{Arc, Mutex};
87 use std::sync::atomic::AtomicUsize;
88 use std::sync::atomic::Ordering::SeqCst;
89 use std::thread;
90 
91 use crate::mpsc::queue::Queue;
92 
93 mod queue;
94 #[cfg(feature = "sink")]
95 mod sink_impl;
96 
97 #[derive(Debug)]
98 struct UnboundedSenderInner<T> {
99     // Channel state shared between the sender and receiver.
100     inner: Arc<UnboundedInner<T>>,
101 }
102 
103 #[derive(Debug)]
104 struct BoundedSenderInner<T> {
105     // Channel state shared between the sender and receiver.
106     inner: Arc<BoundedInner<T>>,
107 
108     // Handle to the task that is blocked on this sender. This handle is sent
109     // to the receiver half in order to be notified when the sender becomes
110     // unblocked.
111     sender_task: Arc<Mutex<SenderTask>>,
112 
113     // `true` if the sender might be blocked. This is an optimization to avoid
114     // having to lock the mutex most of the time.
115     maybe_parked: bool,
116 }
117 
118 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
119 impl<T> Unpin for UnboundedSenderInner<T> {}
120 impl<T> Unpin for BoundedSenderInner<T> {}
121 
122 /// The transmission end of a bounded mpsc channel.
123 ///
124 /// This value is created by the [`channel`](channel) function.
125 #[derive(Debug)]
126 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
127 
128 /// The transmission end of an unbounded mpsc channel.
129 ///
130 /// This value is created by the [`unbounded`](unbounded) function.
131 #[derive(Debug)]
132 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
133 
134 trait AssertKinds: Send + Sync + Clone {}
135 impl AssertKinds for UnboundedSender<u32> {}
136 
137 /// The receiving end of a bounded mpsc channel.
138 ///
139 /// This value is created by the [`channel`](channel) function.
140 #[derive(Debug)]
141 pub struct Receiver<T> {
142     inner: Option<Arc<BoundedInner<T>>>,
143 }
144 
145 /// The receiving end of an unbounded mpsc channel.
146 ///
147 /// This value is created by the [`unbounded`](unbounded) function.
148 #[derive(Debug)]
149 pub struct UnboundedReceiver<T> {
150     inner: Option<Arc<UnboundedInner<T>>>,
151 }
152 
153 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
154 impl<T> Unpin for UnboundedReceiver<T> {}
155 
156 /// The error type for [`Sender`s](Sender) used as `Sink`s.
157 #[derive(Clone, Debug, PartialEq, Eq)]
158 pub struct SendError {
159     kind: SendErrorKind,
160 }
161 
162 /// The error type returned from [`try_send`](Sender::try_send).
163 #[derive(Clone, PartialEq, Eq)]
164 pub struct TrySendError<T> {
165     err: SendError,
166     val: T,
167 }
168 
169 #[derive(Clone, Debug, PartialEq, Eq)]
170 enum SendErrorKind {
171     Full,
172     Disconnected,
173 }
174 
175 /// The error type returned from [`try_next`](Receiver::try_next).
176 pub struct TryRecvError {
177     _priv: (),
178 }
179 
180 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result181     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182         if self.is_full() {
183             write!(f, "send failed because channel is full")
184         } else {
185             write!(f, "send failed because receiver is gone")
186         }
187     }
188 }
189 
190 impl std::error::Error for SendError {}
191 
192 impl SendError {
193     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool194     pub fn is_full(&self) -> bool {
195         match self.kind {
196             SendErrorKind::Full => true,
197             _ => false,
198         }
199     }
200 
201     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool202     pub fn is_disconnected(&self) -> bool {
203         match self.kind {
204             SendErrorKind::Disconnected => true,
205             _ => false,
206         }
207     }
208 }
209 
210 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212         f.debug_struct("TrySendError")
213             .field("kind", &self.err.kind)
214             .finish()
215     }
216 }
217 
218 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result219     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220         if self.is_full() {
221             write!(f, "send failed because channel is full")
222         } else {
223             write!(f, "send failed because receiver is gone")
224         }
225     }
226 }
227 
228 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
229 
230 impl<T> TrySendError<T> {
231     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool232     pub fn is_full(&self) -> bool {
233         self.err.is_full()
234     }
235 
236     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool237     pub fn is_disconnected(&self) -> bool {
238         self.err.is_disconnected()
239     }
240 
241     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T242     pub fn into_inner(self) -> T {
243         self.val
244     }
245 
246     /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError247     pub fn into_send_error(self) -> SendError {
248         self.err
249     }
250 }
251 
252 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result253     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254         f.debug_tuple("TryRecvError")
255             .finish()
256     }
257 }
258 
259 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result260     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261         write!(f, "receiver channel is empty")
262     }
263 }
264 
265 impl std::error::Error for TryRecvError {}
266 
267 #[derive(Debug)]
268 struct UnboundedInner<T> {
269     // Internal channel state. Consists of the number of messages stored in the
270     // channel as well as a flag signalling that the channel is closed.
271     state: AtomicUsize,
272 
273     // Atomic, FIFO queue used to send messages to the receiver
274     message_queue: Queue<T>,
275 
276     // Number of senders in existence
277     num_senders: AtomicUsize,
278 
279     // Handle to the receiver's task.
280     recv_task: AtomicWaker,
281 }
282 
283 #[derive(Debug)]
284 struct BoundedInner<T> {
285     // Max buffer size of the channel. If `None` then the channel is unbounded.
286     buffer: usize,
287 
288     // Internal channel state. Consists of the number of messages stored in the
289     // channel as well as a flag signalling that the channel is closed.
290     state: AtomicUsize,
291 
292     // Atomic, FIFO queue used to send messages to the receiver
293     message_queue: Queue<T>,
294 
295     // Atomic, FIFO queue used to send parked task handles to the receiver.
296     parked_queue: Queue<Arc<Mutex<SenderTask>>>,
297 
298     // Number of senders in existence
299     num_senders: AtomicUsize,
300 
301     // Handle to the receiver's task.
302     recv_task: AtomicWaker,
303 }
304 
305 // Struct representation of `Inner::state`.
306 #[derive(Debug, Clone, Copy)]
307 struct State {
308     // `true` when the channel is open
309     is_open: bool,
310 
311     // Number of messages in the channel
312     num_messages: usize,
313 }
314 
315 // The `is_open` flag is stored in the left-most bit of `Inner::state`
316 const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
317 
318 // When a new channel is created, it is created in the open state with no
319 // pending messages.
320 const INIT_STATE: usize = OPEN_MASK;
321 
322 // The maximum number of messages that a channel can track is `usize::max_value() >> 1`
323 const MAX_CAPACITY: usize = !(OPEN_MASK);
324 
325 // The maximum requested buffer size must be less than the maximum capacity of
326 // a channel. This is because each sender gets a guaranteed slot.
327 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
328 
329 // Sent to the consumer to wake up blocked producers
330 #[derive(Debug)]
331 struct SenderTask {
332     task: Option<Waker>,
333     is_parked: bool,
334 }
335 
336 impl SenderTask {
new() -> Self337     fn new() -> Self {
338         Self {
339             task: None,
340             is_parked: false,
341         }
342     }
343 
notify(&mut self)344     fn notify(&mut self) {
345         self.is_parked = false;
346 
347         if let Some(task) = self.task.take() {
348             task.wake();
349         }
350     }
351 }
352 
353 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
354 ///
355 /// Being bounded, this channel provides backpressure to ensure that the sender
356 /// outpaces the receiver by only a limited amount. The channel's capacity is
357 /// equal to `buffer + num-senders`. In other words, each sender gets a
358 /// guaranteed slot in the channel capacity, and on top of that there are
359 /// `buffer` "first come, first serve" slots available to all senders.
360 ///
361 /// The [`Receiver`](Receiver) returned implements the
362 /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
363 /// `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)364 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
365     // Check that the requested buffer size does not exceed the maximum buffer
366     // size permitted by the system.
367     assert!(buffer < MAX_BUFFER, "requested buffer size too large");
368 
369     let inner = Arc::new(BoundedInner {
370         buffer,
371         state: AtomicUsize::new(INIT_STATE),
372         message_queue: Queue::new(),
373         parked_queue: Queue::new(),
374         num_senders: AtomicUsize::new(1),
375         recv_task: AtomicWaker::new(),
376     });
377 
378     let tx = BoundedSenderInner {
379         inner: inner.clone(),
380         sender_task: Arc::new(Mutex::new(SenderTask::new())),
381         maybe_parked: false,
382     };
383 
384     let rx = Receiver {
385         inner: Some(inner),
386     };
387 
388     (Sender(Some(tx)), rx)
389 }
390 
391 /// Creates an unbounded mpsc channel for communicating between asynchronous
392 /// tasks.
393 ///
394 /// A `send` on this channel will always succeed as long as the receive half has
395 /// not been closed. If the receiver falls behind, messages will be arbitrarily
396 /// buffered.
397 ///
398 /// **Note** that the amount of available system memory is an implicit bound to
399 /// the channel. Using an `unbounded` channel has the ability of causing the
400 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)401 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
402 
403     let inner = Arc::new(UnboundedInner {
404         state: AtomicUsize::new(INIT_STATE),
405         message_queue: Queue::new(),
406         num_senders: AtomicUsize::new(1),
407         recv_task: AtomicWaker::new(),
408     });
409 
410     let tx = UnboundedSenderInner {
411         inner: inner.clone(),
412     };
413 
414     let rx = UnboundedReceiver {
415         inner: Some(inner),
416     };
417 
418     (UnboundedSender(Some(tx)), rx)
419 }
420 
421 /*
422  *
423  * ===== impl Sender =====
424  *
425  */
426 
427 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>428     fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
429         let state = decode_state(self.inner.state.load(SeqCst));
430         if state.is_open {
431             Poll::Ready(Ok(()))
432         } else {
433             Poll::Ready(Err(SendError {
434                 kind: SendErrorKind::Disconnected,
435             }))
436         }
437     }
438 
439 
440     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)441     fn queue_push_and_signal(&self, msg: T) {
442         // Push the message onto the message queue
443         self.inner.message_queue.push(msg);
444 
445         // Signal to the receiver that a message has been enqueued. If the
446         // receiver is parked, this will unpark the task.
447         self.inner.recv_task.wake();
448     }
449 
450     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>451     fn inc_num_messages(&self) -> Option<usize> {
452         let mut curr = self.inner.state.load(SeqCst);
453 
454         loop {
455             let mut state = decode_state(curr);
456 
457             // The receiver end closed the channel.
458             if !state.is_open {
459                 return None;
460             }
461 
462             // This probably is never hit? Odds are the process will run out of
463             // memory first. It may be worth to return something else in this
464             // case?
465             assert!(state.num_messages < MAX_CAPACITY, "buffer space \
466                     exhausted; sending this messages would overflow the state");
467 
468             state.num_messages += 1;
469 
470             let next = encode_state(&state);
471             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
472                 Ok(_) => {
473                     return Some(state.num_messages)
474                 }
475                 Err(actual) => curr = actual,
476             }
477         }
478     }
479 
480     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool481     fn same_receiver(&self, other: &Self) -> bool {
482         Arc::ptr_eq(&self.inner, &other.inner)
483     }
484 
485     /// Returns whether the sender send to this receiver.
is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool486     fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
487         Arc::ptr_eq(&self.inner, inner)
488     }
489 
490     /// Returns pointer to the Arc containing sender
491     ///
492     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>493     fn ptr(&self) -> *const UnboundedInner<T> {
494         &*self.inner
495     }
496 
497     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool498     fn is_closed(&self) -> bool {
499         !decode_state(self.inner.state.load(SeqCst)).is_open
500     }
501 
502     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)503     fn close_channel(&self) {
504         // There's no need to park this sender, its dropping,
505         // and we don't want to check for capacity, so skip
506         // that stuff from `do_send`.
507 
508         self.inner.set_closed();
509         self.inner.recv_task.wake();
510     }
511 }
512 
513 impl<T> BoundedSenderInner<T> {
514     /// Attempts to send a message on this `Sender`, returning the message
515     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>516     fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
517         // If the sender is currently blocked, reject the message
518         if !self.poll_unparked(None).is_ready() {
519             return Err(TrySendError {
520                 err: SendError {
521                     kind: SendErrorKind::Full,
522                 },
523                 val: msg,
524             });
525         }
526 
527         // The channel has capacity to accept the message, so send it
528         self.do_send_b(msg)
529     }
530 
531     // Do the send without failing.
532     // Can be called only by bounded sender.
533     #[allow(clippy::debug_assert_with_mut_call)]
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>534     fn do_send_b(&mut self, msg: T)
535         -> Result<(), TrySendError<T>>
536     {
537         // Anyone callig do_send *should* make sure there is room first,
538         // but assert here for tests as a sanity check.
539         debug_assert!(self.poll_unparked(None).is_ready());
540 
541         // First, increment the number of messages contained by the channel.
542         // This operation will also atomically determine if the sender task
543         // should be parked.
544         //
545         // `None` is returned in the case that the channel has been closed by the
546         // receiver. This happens when `Receiver::close` is called or the
547         // receiver is dropped.
548         let park_self = match self.inc_num_messages() {
549             Some(num_messages) => {
550                 // Block if the current number of pending messages has exceeded
551                 // the configured buffer size
552                 num_messages > self.inner.buffer
553             }
554             None => return Err(TrySendError {
555                 err: SendError {
556                     kind: SendErrorKind::Disconnected,
557                 },
558                 val: msg,
559             }),
560         };
561 
562         // If the channel has reached capacity, then the sender task needs to
563         // be parked. This will send the task handle on the parked task queue.
564         //
565         // However, when `do_send` is called while dropping the `Sender`,
566         // `task::current()` can't be called safely. In this case, in order to
567         // maintain internal consistency, a blank message is pushed onto the
568         // parked task queue.
569         if park_self {
570             self.park();
571         }
572 
573         self.queue_push_and_signal(msg);
574 
575         Ok(())
576     }
577 
578     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)579     fn queue_push_and_signal(&self, msg: T) {
580         // Push the message onto the message queue
581         self.inner.message_queue.push(msg);
582 
583         // Signal to the receiver that a message has been enqueued. If the
584         // receiver is parked, this will unpark the task.
585         self.inner.recv_task.wake();
586     }
587 
588     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>589     fn inc_num_messages(&self) -> Option<usize> {
590         let mut curr = self.inner.state.load(SeqCst);
591 
592         loop {
593             let mut state = decode_state(curr);
594 
595             // The receiver end closed the channel.
596             if !state.is_open {
597                 return None;
598             }
599 
600             // This probably is never hit? Odds are the process will run out of
601             // memory first. It may be worth to return something else in this
602             // case?
603             assert!(state.num_messages < MAX_CAPACITY, "buffer space \
604                     exhausted; sending this messages would overflow the state");
605 
606             state.num_messages += 1;
607 
608             let next = encode_state(&state);
609             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
610                 Ok(_) => {
611                     return Some(state.num_messages)
612                 }
613                 Err(actual) => curr = actual,
614             }
615         }
616     }
617 
park(&mut self)618     fn park(&mut self) {
619         {
620             let mut sender = self.sender_task.lock().unwrap();
621             sender.task = None;
622             sender.is_parked = true;
623         }
624 
625         // Send handle over queue
626         let t = self.sender_task.clone();
627         self.inner.parked_queue.push(t);
628 
629         // Check to make sure we weren't closed after we sent our task on the
630         // queue
631         let state = decode_state(self.inner.state.load(SeqCst));
632         self.maybe_parked = state.is_open;
633     }
634 
635     /// Polls the channel to determine if there is guaranteed capacity to send
636     /// at least one item without waiting.
637     ///
638     /// # Return value
639     ///
640     /// This method returns:
641     ///
642     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
643     /// - `Poll::Pending` if the channel may not have
644     ///   capacity, in which case the current task is queued to be notified once
645     ///   capacity is available;
646     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>647     fn poll_ready(
648         &mut self,
649         cx: &mut Context<'_>,
650     ) -> Poll<Result<(), SendError>> {
651         let state = decode_state(self.inner.state.load(SeqCst));
652         if !state.is_open {
653             return Poll::Ready(Err(SendError {
654                 kind: SendErrorKind::Disconnected,
655             }));
656         }
657 
658         self.poll_unparked(Some(cx)).map(Ok)
659     }
660 
661     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool662     fn same_receiver(&self, other: &Self) -> bool {
663         Arc::ptr_eq(&self.inner, &other.inner)
664     }
665 
666     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool667     fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
668         Arc::ptr_eq(&self.inner, receiver)
669     }
670 
671     /// Returns pointer to the Arc containing sender
672     ///
673     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>674     fn ptr(&self) -> *const BoundedInner<T> {
675         &*self.inner
676     }
677 
678     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool679     fn is_closed(&self) -> bool {
680         !decode_state(self.inner.state.load(SeqCst)).is_open
681     }
682 
683     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)684     fn close_channel(&self) {
685         // There's no need to park this sender, its dropping,
686         // and we don't want to check for capacity, so skip
687         // that stuff from `do_send`.
688 
689         self.inner.set_closed();
690         self.inner.recv_task.wake();
691     }
692 
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>693     fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
694         // First check the `maybe_parked` variable. This avoids acquiring the
695         // lock in most cases
696         if self.maybe_parked {
697             // Get a lock on the task handle
698             let mut task = self.sender_task.lock().unwrap();
699 
700             if !task.is_parked {
701                 self.maybe_parked = false;
702                 return Poll::Ready(())
703             }
704 
705             // At this point, an unpark request is pending, so there will be an
706             // unpark sometime in the future. We just need to make sure that
707             // the correct task will be notified.
708             //
709             // Update the task in case the `Sender` has been moved to another
710             // task
711             task.task = cx.map(|cx| cx.waker().clone());
712 
713             Poll::Pending
714         } else {
715             Poll::Ready(())
716         }
717     }
718 }
719 
720 impl<T> Sender<T> {
721     /// Attempts to send a message on this `Sender`, returning the message
722     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>723     pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
724         if let Some(inner) = &mut self.0 {
725             inner.try_send(msg)
726         } else {
727             Err(TrySendError {
728                 err: SendError {
729                     kind: SendErrorKind::Disconnected,
730                 },
731                 val: msg,
732             })
733         }
734     }
735 
736     /// Send a message on the channel.
737     ///
738     /// This function should only be called after
739     /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
740     /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>741     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
742         self.try_send(msg)
743             .map_err(|e| e.err)
744     }
745 
746     /// Polls the channel to determine if there is guaranteed capacity to send
747     /// at least one item without waiting.
748     ///
749     /// # Return value
750     ///
751     /// This method returns:
752     ///
753     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
754     /// - `Poll::Pending` if the channel may not have
755     ///   capacity, in which case the current task is queued to be notified once
756     ///   capacity is available;
757     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), SendError>>758     pub fn poll_ready(
759         &mut self,
760         cx: &mut Context<'_>,
761     ) -> Poll<Result<(), SendError>> {
762         let inner = self.0.as_mut().ok_or(SendError {
763             kind: SendErrorKind::Disconnected,
764         })?;
765         inner.poll_ready(cx)
766     }
767 
768     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool769     pub fn is_closed(&self) -> bool {
770         self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
771     }
772 
773     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)774     pub fn close_channel(&mut self) {
775         if let Some(inner) = &mut self.0 {
776             inner.close_channel();
777         }
778     }
779 
780     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)781     pub fn disconnect(&mut self) {
782         self.0 = None;
783     }
784 
785     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool786     pub fn same_receiver(&self, other: &Self) -> bool {
787         match (&self.0, &other.0) {
788             (Some(inner), Some(other)) => inner.same_receiver(other),
789             _ => false,
790         }
791     }
792 
793     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Receiver<T>) -> bool794     pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
795         match (&self.0, &receiver.inner) {
796             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
797             _ => false,
798         }
799     }
800 
801     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher802     pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
803         use std::hash::Hash;
804 
805         let ptr = self.0.as_ref().map(|inner| inner.ptr());
806         ptr.hash(hasher);
807     }
808 }
809 
810 impl<T> UnboundedSender<T> {
811     /// Check if the channel is ready to receive a message.
poll_ready( &self, _: &mut Context<'_>, ) -> Poll<Result<(), SendError>>812     pub fn poll_ready(
813         &self,
814         _: &mut Context<'_>,
815     ) -> Poll<Result<(), SendError>> {
816         let inner = self.0.as_ref().ok_or(SendError {
817             kind: SendErrorKind::Disconnected,
818         })?;
819         inner.poll_ready_nb()
820     }
821 
822     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool823     pub fn is_closed(&self) -> bool {
824         self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
825     }
826 
827     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)828     pub fn close_channel(&self) {
829         if let Some(inner) = &self.0 {
830             inner.close_channel();
831         }
832     }
833 
834     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)835     pub fn disconnect(&mut self) {
836         self.0 = None;
837     }
838 
839     // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>840     fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
841         if let Some(inner) = &self.0 {
842             if inner.inc_num_messages().is_some() {
843                 inner.queue_push_and_signal(msg);
844                 return Ok(());
845             }
846         }
847 
848         Err(TrySendError {
849             err: SendError {
850                 kind: SendErrorKind::Disconnected,
851             },
852             val: msg,
853         })
854     }
855 
856     /// Send a message on the channel.
857     ///
858     /// This method should only be called after `poll_ready` has been used to
859     /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>860     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
861         self.do_send_nb(msg)
862             .map_err(|e| e.err)
863     }
864 
865     /// Sends a message along this channel.
866     ///
867     /// This is an unbounded sender, so this function differs from `Sink::send`
868     /// by ensuring the return type reflects that the channel is always ready to
869     /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>870     pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
871         self.do_send_nb(msg)
872     }
873 
874     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool875     pub fn same_receiver(&self, other: &Self) -> bool {
876         match (&self.0, &other.0) {
877             (Some(inner), Some(other)) => inner.same_receiver(other),
878             _ => false,
879         }
880     }
881 
882     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool883     pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
884         match (&self.0, &receiver.inner) {
885             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
886             _ => false,
887         }
888     }
889 
890     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher891     pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
892         use std::hash::Hash;
893 
894         let ptr = self.0.as_ref().map(|inner| inner.ptr());
895         ptr.hash(hasher);
896     }
897 }
898 
899 impl<T> Clone for Sender<T> {
clone(&self) -> Self900     fn clone(&self) -> Self {
901         Self(self.0.clone())
902     }
903 }
904 
905 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self906     fn clone(&self) -> Self {
907         Self(self.0.clone())
908     }
909 }
910 
911 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> Self912     fn clone(&self) -> Self {
913         // Since this atomic op isn't actually guarding any memory and we don't
914         // care about any orderings besides the ordering on the single atomic
915         // variable, a relaxed ordering is acceptable.
916         let mut curr = self.inner.num_senders.load(SeqCst);
917 
918         loop {
919             // If the maximum number of senders has been reached, then fail
920             if curr == MAX_BUFFER {
921                 panic!("cannot clone `Sender` -- too many outstanding senders");
922             }
923 
924             debug_assert!(curr < MAX_BUFFER);
925 
926             let next = curr + 1;
927             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
928                 Ok(_) => {
929                     // The ABA problem doesn't matter here. We only care that the
930                     // number of senders never exceeds the maximum.
931                     return Self {
932                         inner: self.inner.clone(),
933                     };
934                 }
935                 Err(actual) => curr = actual,
936             }
937         }
938     }
939 }
940 
941 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> Self942     fn clone(&self) -> Self {
943         // Since this atomic op isn't actually guarding any memory and we don't
944         // care about any orderings besides the ordering on the single atomic
945         // variable, a relaxed ordering is acceptable.
946         let mut curr = self.inner.num_senders.load(SeqCst);
947 
948         loop {
949             // If the maximum number of senders has been reached, then fail
950             if curr == self.inner.max_senders() {
951                 panic!("cannot clone `Sender` -- too many outstanding senders");
952             }
953 
954             debug_assert!(curr < self.inner.max_senders());
955 
956             let next = curr + 1;
957             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
958                 Ok(_) => {
959                     // The ABA problem doesn't matter here. We only care that the
960                     // number of senders never exceeds the maximum.
961                     return Self {
962                         inner: self.inner.clone(),
963                         sender_task: Arc::new(Mutex::new(SenderTask::new())),
964                         maybe_parked: false,
965                     };
966                 }
967                 Err(actual) => curr = actual,
968             }
969         }
970     }
971 }
972 
973 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)974     fn drop(&mut self) {
975         // Ordering between variables don't matter here
976         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
977 
978         if prev == 1 {
979             self.close_channel();
980         }
981     }
982 }
983 
984 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)985     fn drop(&mut self) {
986         // Ordering between variables don't matter here
987         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
988 
989         if prev == 1 {
990             self.close_channel();
991         }
992     }
993 }
994 
995 /*
996  *
997  * ===== impl Receiver =====
998  *
999  */
1000 
1001 impl<T> Receiver<T> {
1002     /// Closes the receiving half of a channel, without dropping it.
1003     ///
1004     /// This prevents any further messages from being sent on the channel while
1005     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1006     pub fn close(&mut self) {
1007         if let Some(inner) = &mut self.inner {
1008             inner.set_closed();
1009 
1010             // Wake up any threads waiting as they'll see that we've closed the
1011             // channel and will continue on their merry way.
1012             while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1013                 task.lock().unwrap().notify();
1014             }
1015         }
1016     }
1017 
1018     /// Tries to receive the next message without notifying a context if empty.
1019     ///
1020     /// It is not recommended to call this function from inside of a future,
1021     /// only when you've otherwise arranged to be notified when the channel is
1022     /// no longer empty.
1023     ///
1024     /// This function will panic if called after `try_next` or `poll_next` has
1025     /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1026     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1027         match self.next_message() {
1028             Poll::Ready(msg) => {
1029                 Ok(msg)
1030             },
1031             Poll::Pending => Err(TryRecvError { _priv: () }),
1032         }
1033     }
1034 
next_message(&mut self) -> Poll<Option<T>>1035     fn next_message(&mut self) -> Poll<Option<T>> {
1036         let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1037         // Pop off a message
1038         match unsafe { inner.message_queue.pop_spin() } {
1039             Some(msg) => {
1040                 // If there are any parked task handles in the parked queue,
1041                 // pop one and unpark it.
1042                 self.unpark_one();
1043 
1044                 // Decrement number of messages
1045                 self.dec_num_messages();
1046 
1047                 Poll::Ready(Some(msg))
1048             }
1049             None => {
1050                 let state = decode_state(inner.state.load(SeqCst));
1051                 if state.is_closed() {
1052                     // If closed flag is set AND there are no pending messages
1053                     // it means end of stream
1054                     self.inner = None;
1055                     Poll::Ready(None)
1056                 } else {
1057                     // If queue is open, we need to return Pending
1058                     // to be woken up when new messages arrive.
1059                     // If queue is closed but num_messages is non-zero,
1060                     // it means that senders updated the state,
1061                     // but didn't put message to queue yet,
1062                     // so we need to park until sender unparks the task
1063                     // after queueing the message.
1064                     Poll::Pending
1065                 }
1066             }
1067         }
1068     }
1069 
1070     // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1071     fn unpark_one(&mut self) {
1072         if let Some(inner) = &mut self.inner {
1073             if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1074                 task.lock().unwrap().notify();
1075             }
1076         }
1077     }
1078 
dec_num_messages(&self)1079     fn dec_num_messages(&self) {
1080         if let Some(inner) = &self.inner {
1081             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1082             // unless there's underflow, and we know there's no underflow
1083             // because number of messages at this point is always > 0.
1084             inner.state.fetch_sub(1, SeqCst);
1085         }
1086     }
1087 }
1088 
1089 // The receiver does not ever take a Pin to the inner T
1090 impl<T> Unpin for Receiver<T> {}
1091 
1092 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1093     fn is_terminated(&self) -> bool {
1094         self.inner.is_none()
1095     }
1096 }
1097 
1098 impl<T> Stream for Receiver<T> {
1099     type Item = T;
1100 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1101     fn poll_next(
1102         mut self: Pin<&mut Self>,
1103         cx: &mut Context<'_>,
1104     ) -> Poll<Option<T>> {
1105             // Try to read a message off of the message queue.
1106         match self.next_message() {
1107             Poll::Ready(msg) => {
1108                 if msg.is_none() {
1109                     self.inner = None;
1110                 }
1111                 Poll::Ready(msg)
1112             },
1113             Poll::Pending => {
1114                 // There are no messages to read, in this case, park.
1115                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1116                 // Check queue again after parking to prevent race condition:
1117                 // a message could be added to the queue after previous `next_message`
1118                 // before `register` call.
1119                 self.next_message()
1120             }
1121         }
1122     }
1123 }
1124 
1125 impl<T> Drop for Receiver<T> {
drop(&mut self)1126     fn drop(&mut self) {
1127         // Drain the channel of all pending messages
1128         self.close();
1129         if self.inner.is_some() {
1130             loop {
1131                 match self.next_message() {
1132                     Poll::Ready(Some(_)) => {}
1133                     Poll::Ready(None) => break,
1134                     Poll::Pending => {
1135                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1136 
1137                         // If the channel is closed, then there is no need to park.
1138                         if state.is_closed() {
1139                             break;
1140                         }
1141 
1142                         // TODO: Spinning isn't ideal, it might be worth
1143                         // investigating using a condvar or some other strategy
1144                         // here. That said, if this case is hit, then another thread
1145                         // is about to push the value into the queue and this isn't
1146                         // the only spinlock in the impl right now.
1147                         thread::yield_now();
1148                     }
1149                 }
1150             }
1151         }
1152     }
1153 }
1154 
1155 impl<T> UnboundedReceiver<T> {
1156     /// Closes the receiving half of a channel, without dropping it.
1157     ///
1158     /// This prevents any further messages from being sent on the channel while
1159     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1160     pub fn close(&mut self) {
1161         if let Some(inner) = &mut self.inner {
1162             inner.set_closed();
1163         }
1164     }
1165 
1166     /// Tries to receive the next message without notifying a context if empty.
1167     ///
1168     /// It is not recommended to call this function from inside of a future,
1169     /// only when you've otherwise arranged to be notified when the channel is
1170     /// no longer empty.
1171     ///
1172     /// This function will panic if called after `try_next` or `poll_next` has
1173     /// returned `None`.
try_next(&mut self) -> Result<Option<T>, TryRecvError>1174     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1175         match self.next_message() {
1176             Poll::Ready(msg) => {
1177                 Ok(msg)
1178             },
1179             Poll::Pending => Err(TryRecvError { _priv: () }),
1180         }
1181     }
1182 
next_message(&mut self) -> Poll<Option<T>>1183     fn next_message(&mut self) -> Poll<Option<T>> {
1184         let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
1185         // Pop off a message
1186         match unsafe { inner.message_queue.pop_spin() } {
1187             Some(msg) => {
1188                 // Decrement number of messages
1189                 self.dec_num_messages();
1190 
1191                 Poll::Ready(Some(msg))
1192             }
1193             None => {
1194                 let state = decode_state(inner.state.load(SeqCst));
1195                 if state.is_closed() {
1196                     // If closed flag is set AND there are no pending messages
1197                     // it means end of stream
1198                     self.inner = None;
1199                     Poll::Ready(None)
1200                 } else {
1201                     // If queue is open, we need to return Pending
1202                     // to be woken up when new messages arrive.
1203                     // If queue is closed but num_messages is non-zero,
1204                     // it means that senders updated the state,
1205                     // but didn't put message to queue yet,
1206                     // so we need to park until sender unparks the task
1207                     // after queueing the message.
1208                     Poll::Pending
1209                 }
1210             }
1211         }
1212     }
1213 
dec_num_messages(&self)1214     fn dec_num_messages(&self) {
1215         if let Some(inner) = &self.inner {
1216             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1217             // unless there's underflow, and we know there's no underflow
1218             // because number of messages at this point is always > 0.
1219             inner.state.fetch_sub(1, SeqCst);
1220         }
1221     }
1222 }
1223 
1224 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1225     fn is_terminated(&self) -> bool {
1226         self.inner.is_none()
1227     }
1228 }
1229 
1230 impl<T> Stream for UnboundedReceiver<T> {
1231     type Item = T;
1232 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>>1233     fn poll_next(
1234         mut self: Pin<&mut Self>,
1235         cx: &mut Context<'_>,
1236     ) -> Poll<Option<T>> {
1237         // Try to read a message off of the message queue.
1238         match self.next_message() {
1239             Poll::Ready(msg) => {
1240                 if msg.is_none() {
1241                     self.inner = None;
1242                 }
1243                 Poll::Ready(msg)
1244             },
1245             Poll::Pending => {
1246                 // There are no messages to read, in this case, park.
1247                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1248                 // Check queue again after parking to prevent race condition:
1249                 // a message could be added to the queue after previous `next_message`
1250                 // before `register` call.
1251                 self.next_message()
1252             }
1253         }
1254     }
1255 }
1256 
1257 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1258     fn drop(&mut self) {
1259         // Drain the channel of all pending messages
1260         self.close();
1261         if self.inner.is_some() {
1262             loop {
1263                 match self.next_message() {
1264                     Poll::Ready(Some(_)) => {}
1265                     Poll::Ready(None) => break,
1266                     Poll::Pending => {
1267                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1268 
1269                         // If the channel is closed, then there is no need to park.
1270                         if state.is_closed() {
1271                             break;
1272                         }
1273 
1274                         // TODO: Spinning isn't ideal, it might be worth
1275                         // investigating using a condvar or some other strategy
1276                         // here. That said, if this case is hit, then another thread
1277                         // is about to push the value into the queue and this isn't
1278                         // the only spinlock in the impl right now.
1279                         thread::yield_now();
1280                     }
1281                 }
1282             }
1283         }
1284     }
1285 }
1286 
1287 /*
1288  *
1289  * ===== impl Inner =====
1290  *
1291  */
1292 
1293 impl<T> UnboundedInner<T> {
1294     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1295     fn set_closed(&self) {
1296         let curr = self.state.load(SeqCst);
1297         if !decode_state(curr).is_open {
1298             return;
1299         }
1300 
1301         self.state.fetch_and(!OPEN_MASK, SeqCst);
1302     }
1303 }
1304 
1305 impl<T> BoundedInner<T> {
1306     // The return value is such that the total number of messages that can be
1307     // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1308     fn max_senders(&self) -> usize {
1309         MAX_CAPACITY - self.buffer
1310     }
1311 
1312     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1313     fn set_closed(&self) {
1314         let curr = self.state.load(SeqCst);
1315         if !decode_state(curr).is_open {
1316             return;
1317         }
1318 
1319         self.state.fetch_and(!OPEN_MASK, SeqCst);
1320     }
1321 }
1322 
1323 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1324 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1325 
1326 unsafe impl<T: Send> Send for BoundedInner<T> {}
1327 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1328 
1329 impl State {
is_closed(&self) -> bool1330     fn is_closed(&self) -> bool {
1331         !self.is_open && self.num_messages == 0
1332     }
1333 }
1334 
1335 /*
1336  *
1337  * ===== Helpers =====
1338  *
1339  */
1340 
decode_state(num: usize) -> State1341 fn decode_state(num: usize) -> State {
1342     State {
1343         is_open: num & OPEN_MASK == OPEN_MASK,
1344         num_messages: num & MAX_CAPACITY,
1345     }
1346 }
1347 
encode_state(state: &State) -> usize1348 fn encode_state(state: &State) -> usize {
1349     let mut num = state.num_messages;
1350 
1351     if state.is_open {
1352         num |= OPEN_MASK;
1353     }
1354 
1355     num
1356 }
1357