1 #![cfg(feature = "tokio")]
2 
3 extern crate flate2;
4 extern crate futures;
5 extern crate rand;
6 extern crate tokio_io;
7 extern crate tokio_tcp;
8 extern crate tokio_threadpool;
9 
10 use std::io::{Read, Write};
11 use std::iter;
12 use std::net::{Shutdown, TcpListener};
13 use std::thread;
14 
15 use flate2::read;
16 use flate2::write;
17 use flate2::Compression;
18 use futures::Future;
19 use rand::{thread_rng, Rng};
20 use tokio_io::io::{copy, shutdown};
21 use tokio_io::AsyncRead;
22 use tokio_tcp::TcpStream;
23 
24 #[test]
tcp_stream_echo_pattern()25 fn tcp_stream_echo_pattern() {
26     const N: u8 = 16;
27     const M: usize = 16 * 1024;
28 
29     let listener = TcpListener::bind("127.0.0.1:0").unwrap();
30     let addr = listener.local_addr().unwrap();
31     let t = thread::spawn(move || {
32         let a = listener.accept().unwrap().0;
33         let b = a.try_clone().unwrap();
34 
35         let t = thread::spawn(move || {
36             let mut b = read::DeflateDecoder::new(b);
37             let mut buf = [0; M];
38             for i in 0..N {
39                 b.read_exact(&mut buf).unwrap();
40                 for byte in buf.iter() {
41                     assert_eq!(*byte, i);
42                 }
43             }
44 
45             assert_eq!(b.read(&mut buf).unwrap(), 0);
46         });
47 
48         let mut a = write::ZlibEncoder::new(a, Compression::default());
49         for i in 0..N {
50             let buf = [i; M];
51             a.write_all(&buf).unwrap();
52         }
53         a.finish().unwrap().shutdown(Shutdown::Write).unwrap();
54 
55         t.join().unwrap();
56     });
57 
58     let stream = TcpStream::connect(&addr);
59     let copy = stream
60         .and_then(|s| {
61             let (a, b) = s.split();
62             let a = read::ZlibDecoder::new(a);
63             let b = write::DeflateEncoder::new(b, Compression::default());
64             copy(a, b)
65         })
66         .then(|result| {
67             let (amt, _a, b) = result.unwrap();
68             assert_eq!(amt, (N as u64) * (M as u64));
69             shutdown(b).map(|_| ())
70         })
71         .map_err(|err| panic!("{}", err));
72 
73     let threadpool = tokio_threadpool::Builder::new().build();
74     threadpool.spawn(copy);
75     threadpool.shutdown().wait().unwrap();
76     t.join().unwrap();
77 }
78 
79 #[test]
echo_random()80 fn echo_random() {
81     let v = iter::repeat(())
82         .take(1024 * 1024)
83         .map(|()| thread_rng().gen::<u8>())
84         .collect::<Vec<_>>();
85     let listener = TcpListener::bind("127.0.0.1:0").unwrap();
86     let addr = listener.local_addr().unwrap();
87     let v2 = v.clone();
88     let t = thread::spawn(move || {
89         let a = listener.accept().unwrap().0;
90         let b = a.try_clone().unwrap();
91 
92         let mut v3 = v2.clone();
93         let t = thread::spawn(move || {
94             let mut b = read::DeflateDecoder::new(b);
95             let mut buf = [0; 1024];
96             while v3.len() > 0 {
97                 let n = b.read(&mut buf).unwrap();
98                 for (actual, expected) in buf[..n].iter().zip(&v3) {
99                     assert_eq!(*actual, *expected);
100                 }
101                 v3.drain(..n);
102             }
103 
104             assert_eq!(b.read(&mut buf).unwrap(), 0);
105         });
106 
107         let mut a = write::ZlibEncoder::new(a, Compression::default());
108         a.write_all(&v2).unwrap();
109         a.finish().unwrap().shutdown(Shutdown::Write).unwrap();
110 
111         t.join().unwrap();
112     });
113 
114     let stream = TcpStream::connect(&addr);
115     let copy = stream
116         .and_then(|s| {
117             let (a, b) = s.split();
118             let a = read::ZlibDecoder::new(a);
119             let b = write::DeflateEncoder::new(b, Compression::default());
120             copy(a, b)
121         })
122         .then(move |result| {
123             let (amt, _a, b) = result.unwrap();
124             assert_eq!(amt, v.len() as u64);
125             shutdown(b).map(|_| ())
126         })
127         .map_err(|err| panic!("{}", err));
128 
129     let threadpool = tokio_threadpool::Builder::new().build();
130     threadpool.spawn(copy);
131     threadpool.shutdown().wait().unwrap();
132     t.join().unwrap();
133 }
134