1 use core::fmt::{Debug, Formatter, Result as FmtResult};
2 use core::pin::Pin;
3 use futures_core::task::{Context, Poll};
4 use futures_sink::Sink;
5 use pin_project_lite::pin_project;
6 
7 pin_project! {
8     /// Sink that clones incoming items and forwards them to two sinks at the same time.
9     ///
10     /// Backpressure from any downstream sink propagates up, which means that this sink
11     /// can only process items as fast as its _slowest_ downstream sink.
12     #[must_use = "sinks do nothing unless polled"]
13     pub struct Fanout<Si1, Si2> {
14         #[pin]
15         sink1: Si1,
16         #[pin]
17         sink2: Si2
18     }
19 }
20 
21 impl<Si1, Si2> Fanout<Si1, Si2> {
new(sink1: Si1, sink2: Si2) -> Self22     pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
23         Self { sink1, sink2 }
24     }
25 
26     /// Get a shared reference to the inner sinks.
get_ref(&self) -> (&Si1, &Si2)27     pub fn get_ref(&self) -> (&Si1, &Si2) {
28         (&self.sink1, &self.sink2)
29     }
30 
31     /// Get a mutable reference to the inner sinks.
get_mut(&mut self) -> (&mut Si1, &mut Si2)32     pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
33         (&mut self.sink1, &mut self.sink2)
34     }
35 
36     /// Get a pinned mutable reference to the inner sinks.
get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>)37     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
38         let this = self.project();
39         (this.sink1, this.sink2)
40     }
41 
42     /// Consumes this combinator, returning the underlying sinks.
43     ///
44     /// Note that this may discard intermediate state of this combinator,
45     /// so care should be taken to avoid losing resources when this is called.
into_inner(self) -> (Si1, Si2)46     pub fn into_inner(self) -> (Si1, Si2) {
47         (self.sink1, self.sink2)
48     }
49 }
50 
51 impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
fmt(&self, f: &mut Formatter<'_>) -> FmtResult52     fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
53         f.debug_struct("Fanout")
54             .field("sink1", &self.sink1)
55             .field("sink2", &self.sink2)
56             .finish()
57     }
58 }
59 
60 impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
61     where Si1: Sink<Item>,
62           Item: Clone,
63           Si2: Sink<Item, Error=Si1::Error>
64 {
65     type Error = Si1::Error;
66 
poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>67     fn poll_ready(
68         self: Pin<&mut Self>,
69         cx: &mut Context<'_>,
70     ) -> Poll<Result<(), Self::Error>> {
71         let this = self.project();
72 
73         let sink1_ready = this.sink1.poll_ready(cx)?.is_ready();
74         let sink2_ready = this.sink2.poll_ready(cx)?.is_ready();
75         let ready = sink1_ready && sink2_ready;
76         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
77     }
78 
start_send( self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::Error>79     fn start_send(
80         self: Pin<&mut Self>,
81         item: Item,
82     ) -> Result<(), Self::Error> {
83         let this = self.project();
84 
85         this.sink1.start_send(item.clone())?;
86         this.sink2.start_send(item)?;
87         Ok(())
88     }
89 
poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>90     fn poll_flush(
91         self: Pin<&mut Self>,
92         cx: &mut Context<'_>,
93     ) -> Poll<Result<(), Self::Error>> {
94         let this = self.project();
95 
96         let sink1_ready = this.sink1.poll_flush(cx)?.is_ready();
97         let sink2_ready = this.sink2.poll_flush(cx)?.is_ready();
98         let ready = sink1_ready && sink2_ready;
99         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
100     }
101 
poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>102     fn poll_close(
103         self: Pin<&mut Self>,
104         cx: &mut Context<'_>,
105     ) -> Poll<Result<(), Self::Error>> {
106         let this = self.project();
107 
108         let sink1_ready = this.sink1.poll_close(cx)?.is_ready();
109         let sink2_ready = this.sink2.poll_close(cx)?.is_ready();
110         let ready = sink1_ready && sink2_ready;
111         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
112     }
113 }
114