1 #[test]
test_split()2 fn test_split() {
3 use futures::executor::block_on;
4 use futures::sink::{Sink, SinkExt};
5 use futures::stream::{self, Stream, StreamExt};
6 use futures::task::{Context, Poll};
7 use pin_project::pin_project;
8 use std::pin::Pin;
9
10 #[pin_project]
11 struct Join<T, U> {
12 #[pin]
13 stream: T,
14 #[pin]
15 sink: U,
16 }
17
18 impl<T: Stream, U> Stream for Join<T, U> {
19 type Item = T::Item;
20
21 fn poll_next(
22 self: Pin<&mut Self>,
23 cx: &mut Context<'_>,
24 ) -> Poll<Option<T::Item>> {
25 self.project().stream.poll_next(cx)
26 }
27 }
28
29 impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
30 type Error = U::Error;
31
32 fn poll_ready(
33 self: Pin<&mut Self>,
34 cx: &mut Context<'_>,
35 ) -> Poll<Result<(), Self::Error>> {
36 self.project().sink.poll_ready(cx)
37 }
38
39 fn start_send(
40 self: Pin<&mut Self>,
41 item: Item,
42 ) -> Result<(), Self::Error> {
43 self.project().sink.start_send(item)
44 }
45
46 fn poll_flush(
47 self: Pin<&mut Self>,
48 cx: &mut Context<'_>,
49 ) -> Poll<Result<(), Self::Error>> {
50 self.project().sink.poll_flush(cx)
51 }
52
53 fn poll_close(
54 self: Pin<&mut Self>,
55 cx: &mut Context<'_>,
56 ) -> Poll<Result<(), Self::Error>> {
57 self.project().sink.poll_close(cx)
58 }
59 }
60
61 let mut dest: Vec<i32> = Vec::new();
62 {
63 let join = Join {
64 stream: stream::iter(vec![10, 20, 30]),
65 sink: &mut dest
66 };
67
68 let (sink, stream) = join.split();
69 let join = sink.reunite(stream).expect("test_split: reunite error");
70 let (mut sink, stream) = join.split();
71 let mut stream = stream.map(Ok);
72 block_on(sink.send_all(&mut stream)).unwrap();
73 }
74 assert_eq!(dest, vec![10, 20, 30]);
75 }
76