1 #[test]
test_split()2 fn test_split() {
3     use futures::executor::block_on;
4     use futures::sink::{Sink, SinkExt};
5     use futures::stream::{self, Stream, StreamExt};
6     use futures::task::{Context, Poll};
7     use pin_project::pin_project;
8     use std::pin::Pin;
9 
10     #[pin_project]
11     struct Join<T, U> {
12         #[pin]
13         stream: T,
14         #[pin]
15         sink: U,
16     }
17 
18     impl<T: Stream, U> Stream for Join<T, U> {
19         type Item = T::Item;
20 
21         fn poll_next(
22             self: Pin<&mut Self>,
23             cx: &mut Context<'_>,
24         ) -> Poll<Option<T::Item>> {
25             self.project().stream.poll_next(cx)
26         }
27     }
28 
29     impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
30         type Error = U::Error;
31 
32         fn poll_ready(
33             self: Pin<&mut Self>,
34             cx: &mut Context<'_>,
35         ) -> Poll<Result<(), Self::Error>> {
36             self.project().sink.poll_ready(cx)
37         }
38 
39         fn start_send(
40             self: Pin<&mut Self>,
41             item: Item,
42         ) -> Result<(), Self::Error> {
43             self.project().sink.start_send(item)
44         }
45 
46         fn poll_flush(
47             self: Pin<&mut Self>,
48             cx: &mut Context<'_>,
49         ) -> Poll<Result<(), Self::Error>> {
50             self.project().sink.poll_flush(cx)
51         }
52 
53         fn poll_close(
54             self: Pin<&mut Self>,
55             cx: &mut Context<'_>,
56         ) -> Poll<Result<(), Self::Error>> {
57             self.project().sink.poll_close(cx)
58         }
59     }
60 
61     let mut dest: Vec<i32> = Vec::new();
62     {
63        let join = Join {
64             stream: stream::iter(vec![10, 20, 30]),
65             sink: &mut dest
66         };
67 
68         let (sink, stream) = join.split();
69         let join = sink.reunite(stream).expect("test_split: reunite error");
70         let (mut sink, stream) = join.split();
71         let mut stream = stream.map(Ok);
72         block_on(sink.send_all(&mut stream)).unwrap();
73     }
74     assert_eq!(dest, vec![10, 20, 30]);
75 }
76