1 #![cfg(feature = "tokio")]
2
3 extern crate flate2;
4 extern crate futures;
5 extern crate rand;
6 extern crate tokio_io;
7 extern crate tokio_tcp;
8 extern crate tokio_threadpool;
9
10 use std::io::{Read, Write};
11 use std::iter;
12 use std::net::{Shutdown, TcpListener};
13 use std::thread;
14
15 use flate2::read;
16 use flate2::write;
17 use flate2::Compression;
18 use futures::Future;
19 use rand::{thread_rng, Rng};
20 use tokio_io::io::{copy, shutdown};
21 use tokio_io::AsyncRead;
22 use tokio_tcp::TcpStream;
23
24 #[test]
tcp_stream_echo_pattern()25 fn tcp_stream_echo_pattern() {
26 const N: u8 = 16;
27 const M: usize = 16 * 1024;
28
29 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
30 let addr = listener.local_addr().unwrap();
31 let t = thread::spawn(move || {
32 let a = listener.accept().unwrap().0;
33 let b = a.try_clone().unwrap();
34
35 let t = thread::spawn(move || {
36 let mut b = read::DeflateDecoder::new(b);
37 let mut buf = [0; M];
38 for i in 0..N {
39 b.read_exact(&mut buf).unwrap();
40 for byte in buf.iter() {
41 assert_eq!(*byte, i);
42 }
43 }
44
45 assert_eq!(b.read(&mut buf).unwrap(), 0);
46 });
47
48 let mut a = write::ZlibEncoder::new(a, Compression::default());
49 for i in 0..N {
50 let buf = [i; M];
51 a.write_all(&buf).unwrap();
52 }
53 a.finish().unwrap().shutdown(Shutdown::Write).unwrap();
54
55 t.join().unwrap();
56 });
57
58 let stream = TcpStream::connect(&addr);
59 let copy = stream
60 .and_then(|s| {
61 let (a, b) = s.split();
62 let a = read::ZlibDecoder::new(a);
63 let b = write::DeflateEncoder::new(b, Compression::default());
64 copy(a, b)
65 })
66 .then(|result| {
67 let (amt, _a, b) = result.unwrap();
68 assert_eq!(amt, (N as u64) * (M as u64));
69 shutdown(b).map(|_| ())
70 })
71 .map_err(|err| panic!("{}", err));
72
73 let threadpool = tokio_threadpool::Builder::new().build();
74 threadpool.spawn(copy);
75 threadpool.shutdown().wait().unwrap();
76 t.join().unwrap();
77 }
78
79 #[test]
echo_random()80 fn echo_random() {
81 let v = iter::repeat(())
82 .take(1024 * 1024)
83 .map(|()| thread_rng().gen::<u8>())
84 .collect::<Vec<_>>();
85 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
86 let addr = listener.local_addr().unwrap();
87 let v2 = v.clone();
88 let t = thread::spawn(move || {
89 let a = listener.accept().unwrap().0;
90 let b = a.try_clone().unwrap();
91
92 let mut v3 = v2.clone();
93 let t = thread::spawn(move || {
94 let mut b = read::DeflateDecoder::new(b);
95 let mut buf = [0; 1024];
96 while v3.len() > 0 {
97 let n = b.read(&mut buf).unwrap();
98 for (actual, expected) in buf[..n].iter().zip(&v3) {
99 assert_eq!(*actual, *expected);
100 }
101 v3.drain(..n);
102 }
103
104 assert_eq!(b.read(&mut buf).unwrap(), 0);
105 });
106
107 let mut a = write::ZlibEncoder::new(a, Compression::default());
108 a.write_all(&v2).unwrap();
109 a.finish().unwrap().shutdown(Shutdown::Write).unwrap();
110
111 t.join().unwrap();
112 });
113
114 let stream = TcpStream::connect(&addr);
115 let copy = stream
116 .and_then(|s| {
117 let (a, b) = s.split();
118 let a = read::ZlibDecoder::new(a);
119 let b = write::DeflateEncoder::new(b, Compression::default());
120 copy(a, b)
121 })
122 .then(move |result| {
123 let (amt, _a, b) = result.unwrap();
124 assert_eq!(amt, v.len() as u64);
125 shutdown(b).map(|_| ())
126 })
127 .map_err(|err| panic!("{}", err));
128
129 let threadpool = tokio_threadpool::Builder::new().build();
130 threadpool.spawn(copy);
131 threadpool.shutdown().wait().unwrap();
132 t.join().unwrap();
133 }
134