1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use futures::future::poll_fn;
5 use std::io;
6 use std::sync::Arc;
7 use tokio::{io::ReadBuf, net::UdpSocket};
8 
9 const MSG: &[u8] = b"hello";
10 const MSG_LEN: usize = MSG.len();
11 
12 #[tokio::test]
send_recv() -> std::io::Result<()>13 async fn send_recv() -> std::io::Result<()> {
14     let sender = UdpSocket::bind("127.0.0.1:0").await?;
15     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
16 
17     sender.connect(receiver.local_addr()?).await?;
18     receiver.connect(sender.local_addr()?).await?;
19 
20     sender.send(MSG).await?;
21 
22     let mut recv_buf = [0u8; 32];
23     let len = receiver.recv(&mut recv_buf[..]).await?;
24 
25     assert_eq!(&recv_buf[..len], MSG);
26     Ok(())
27 }
28 
29 #[tokio::test]
send_recv_poll() -> std::io::Result<()>30 async fn send_recv_poll() -> std::io::Result<()> {
31     let sender = UdpSocket::bind("127.0.0.1:0").await?;
32     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
33 
34     sender.connect(receiver.local_addr()?).await?;
35     receiver.connect(sender.local_addr()?).await?;
36 
37     poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
38 
39     let mut recv_buf = [0u8; 32];
40     let mut read = ReadBuf::new(&mut recv_buf);
41     let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
42 
43     assert_eq!(read.filled(), MSG);
44     Ok(())
45 }
46 
47 #[tokio::test]
send_to_recv_from() -> std::io::Result<()>48 async fn send_to_recv_from() -> std::io::Result<()> {
49     let sender = UdpSocket::bind("127.0.0.1:0").await?;
50     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
51 
52     let receiver_addr = receiver.local_addr()?;
53     sender.send_to(MSG, &receiver_addr).await?;
54 
55     let mut recv_buf = [0u8; 32];
56     let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
57 
58     assert_eq!(&recv_buf[..len], MSG);
59     assert_eq!(addr, sender.local_addr()?);
60     Ok(())
61 }
62 
63 #[tokio::test]
send_to_recv_from_poll() -> std::io::Result<()>64 async fn send_to_recv_from_poll() -> std::io::Result<()> {
65     let sender = UdpSocket::bind("127.0.0.1:0").await?;
66     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
67 
68     let receiver_addr = receiver.local_addr()?;
69     poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
70 
71     let mut recv_buf = [0u8; 32];
72     let mut read = ReadBuf::new(&mut recv_buf);
73     let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
74 
75     assert_eq!(read.filled(), MSG);
76     assert_eq!(addr, sender.local_addr()?);
77     Ok(())
78 }
79 
80 #[tokio::test]
send_to_peek_from() -> std::io::Result<()>81 async fn send_to_peek_from() -> std::io::Result<()> {
82     let sender = UdpSocket::bind("127.0.0.1:0").await?;
83     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
84 
85     let receiver_addr = receiver.local_addr()?;
86     poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
87 
88     // peek
89     let mut recv_buf = [0u8; 32];
90     let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
91     assert_eq!(&recv_buf[..n], MSG);
92     assert_eq!(addr, sender.local_addr()?);
93 
94     // peek
95     let mut recv_buf = [0u8; 32];
96     let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
97     assert_eq!(&recv_buf[..n], MSG);
98     assert_eq!(addr, sender.local_addr()?);
99 
100     let mut recv_buf = [0u8; 32];
101     let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
102     assert_eq!(&recv_buf[..n], MSG);
103     assert_eq!(addr, sender.local_addr()?);
104 
105     Ok(())
106 }
107 
108 #[tokio::test]
send_to_peek_from_poll() -> std::io::Result<()>109 async fn send_to_peek_from_poll() -> std::io::Result<()> {
110     let sender = UdpSocket::bind("127.0.0.1:0").await?;
111     let receiver = UdpSocket::bind("127.0.0.1:0").await?;
112 
113     let receiver_addr = receiver.local_addr()?;
114     poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
115 
116     let mut recv_buf = [0u8; 32];
117     let mut read = ReadBuf::new(&mut recv_buf);
118     let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
119 
120     assert_eq!(read.filled(), MSG);
121     assert_eq!(addr, sender.local_addr()?);
122 
123     let mut recv_buf = [0u8; 32];
124     let mut read = ReadBuf::new(&mut recv_buf);
125     poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
126 
127     assert_eq!(read.filled(), MSG);
128     let mut recv_buf = [0u8; 32];
129     let mut read = ReadBuf::new(&mut recv_buf);
130 
131     poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
132     assert_eq!(read.filled(), MSG);
133     Ok(())
134 }
135 
136 #[tokio::test]
split() -> std::io::Result<()>137 async fn split() -> std::io::Result<()> {
138     let socket = UdpSocket::bind("127.0.0.1:0").await?;
139     let s = Arc::new(socket);
140     let r = s.clone();
141 
142     let addr = s.local_addr()?;
143     tokio::spawn(async move {
144         s.send_to(MSG, &addr).await.unwrap();
145     });
146     let mut recv_buf = [0u8; 32];
147     let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
148     assert_eq!(&recv_buf[..len], MSG);
149     Ok(())
150 }
151 
152 #[tokio::test]
split_chan() -> std::io::Result<()>153 async fn split_chan() -> std::io::Result<()> {
154     // setup UdpSocket that will echo all sent items
155     let socket = UdpSocket::bind("127.0.0.1:0").await?;
156     let addr = socket.local_addr().unwrap();
157     let s = Arc::new(socket);
158     let r = s.clone();
159 
160     let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
161     tokio::spawn(async move {
162         while let Some((bytes, addr)) = rx.recv().await {
163             s.send_to(&bytes, &addr).await.unwrap();
164         }
165     });
166 
167     tokio::spawn(async move {
168         let mut buf = [0u8; 32];
169         loop {
170             let (len, addr) = r.recv_from(&mut buf).await.unwrap();
171             tx.send((buf[..len].to_vec(), addr)).await.unwrap();
172         }
173     });
174 
175     // test that we can send a value and get back some response
176     let sender = UdpSocket::bind("127.0.0.1:0").await?;
177     sender.send_to(MSG, addr).await?;
178     let mut recv_buf = [0u8; 32];
179     let (len, _) = sender.recv_from(&mut recv_buf).await?;
180     assert_eq!(&recv_buf[..len], MSG);
181     Ok(())
182 }
183 
184 #[tokio::test]
split_chan_poll() -> std::io::Result<()>185 async fn split_chan_poll() -> std::io::Result<()> {
186     // setup UdpSocket that will echo all sent items
187     let socket = UdpSocket::bind("127.0.0.1:0").await?;
188     let addr = socket.local_addr().unwrap();
189     let s = Arc::new(socket);
190     let r = s.clone();
191 
192     let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
193     tokio::spawn(async move {
194         while let Some((bytes, addr)) = rx.recv().await {
195             poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
196                 .await
197                 .unwrap();
198         }
199     });
200 
201     tokio::spawn(async move {
202         let mut recv_buf = [0u8; 32];
203         let mut read = ReadBuf::new(&mut recv_buf);
204         loop {
205             let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
206             tx.send((read.filled().to_vec(), addr)).await.unwrap();
207         }
208     });
209 
210     // test that we can send a value and get back some response
211     let sender = UdpSocket::bind("127.0.0.1:0").await?;
212     poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
213 
214     let mut recv_buf = [0u8; 32];
215     let mut read = ReadBuf::new(&mut recv_buf);
216     let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
217     assert_eq!(read.filled(), MSG);
218     Ok(())
219 }
220 
221 // # Note
222 //
223 // This test is purposely written such that each time `sender` sends data on
224 // the socket, `receiver` awaits the data. On Unix, it would be okay waiting
225 // until the end of the test to receive all the data. On Windows, this would
226 // **not** be okay because it's resources are completion based (via IOCP).
227 // If data is sent and not yet received, attempting to send more data will
228 // result in `ErrorKind::WouldBlock` until the first operation completes.
229 #[tokio::test]
try_send_spawn()230 async fn try_send_spawn() {
231     const MSG2: &[u8] = b"world!";
232     const MSG2_LEN: usize = MSG2.len();
233 
234     let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
235     let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
236 
237     receiver
238         .connect(sender.local_addr().unwrap())
239         .await
240         .unwrap();
241 
242     sender.writable().await.unwrap();
243 
244     let sent = &sender
245         .try_send_to(MSG, receiver.local_addr().unwrap())
246         .unwrap();
247     assert_eq!(sent, &MSG_LEN);
248     let mut buf = [0u8; 32];
249     let mut received = receiver.recv(&mut buf[..]).await.unwrap();
250 
251     sender
252         .connect(receiver.local_addr().unwrap())
253         .await
254         .unwrap();
255     let sent = &sender.try_send(MSG2).unwrap();
256     assert_eq!(sent, &MSG2_LEN);
257     received += receiver.recv(&mut buf[..]).await.unwrap();
258 
259     std::thread::spawn(move || {
260         let sent = &sender.try_send(MSG).unwrap();
261         assert_eq!(sent, &MSG_LEN);
262     })
263     .join()
264     .unwrap();
265     received += receiver.recv(&mut buf[..]).await.unwrap();
266 
267     assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
268 }
269 
270 #[tokio::test]
try_send_recv()271 async fn try_send_recv() {
272     // Create listener
273     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
274 
275     // Create socket pair
276     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
277 
278     // Connect the two
279     client.connect(server.local_addr().unwrap()).await.unwrap();
280     server.connect(client.local_addr().unwrap()).await.unwrap();
281 
282     for _ in 0..5 {
283         loop {
284             client.writable().await.unwrap();
285 
286             match client.try_send(b"hello world") {
287                 Ok(n) => {
288                     assert_eq!(n, 11);
289                     break;
290                 }
291                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
292                 Err(e) => panic!("{:?}", e),
293             }
294         }
295 
296         loop {
297             server.readable().await.unwrap();
298 
299             let mut buf = [0; 512];
300 
301             match server.try_recv(&mut buf) {
302                 Ok(n) => {
303                     assert_eq!(n, 11);
304                     assert_eq!(&buf[0..11], &b"hello world"[..]);
305                     break;
306                 }
307                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
308                 Err(e) => panic!("{:?}", e),
309             }
310         }
311     }
312 }
313 
314 #[tokio::test]
try_send_to_recv_from()315 async fn try_send_to_recv_from() {
316     // Create listener
317     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
318     let saddr = server.local_addr().unwrap();
319 
320     // Create socket pair
321     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
322     let caddr = client.local_addr().unwrap();
323 
324     for _ in 0..5 {
325         loop {
326             client.writable().await.unwrap();
327 
328             match client.try_send_to(b"hello world", saddr) {
329                 Ok(n) => {
330                     assert_eq!(n, 11);
331                     break;
332                 }
333                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
334                 Err(e) => panic!("{:?}", e),
335             }
336         }
337 
338         loop {
339             server.readable().await.unwrap();
340 
341             let mut buf = [0; 512];
342 
343             match server.try_recv_from(&mut buf) {
344                 Ok((n, addr)) => {
345                     assert_eq!(n, 11);
346                     assert_eq!(addr, caddr);
347                     assert_eq!(&buf[0..11], &b"hello world"[..]);
348                     break;
349                 }
350                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
351                 Err(e) => panic!("{:?}", e),
352             }
353         }
354     }
355 }
356 
357 #[tokio::test]
try_recv_buf()358 async fn try_recv_buf() {
359     // Create listener
360     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
361 
362     // Create socket pair
363     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
364 
365     // Connect the two
366     client.connect(server.local_addr().unwrap()).await.unwrap();
367     server.connect(client.local_addr().unwrap()).await.unwrap();
368 
369     for _ in 0..5 {
370         loop {
371             client.writable().await.unwrap();
372 
373             match client.try_send(b"hello world") {
374                 Ok(n) => {
375                     assert_eq!(n, 11);
376                     break;
377                 }
378                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
379                 Err(e) => panic!("{:?}", e),
380             }
381         }
382 
383         loop {
384             server.readable().await.unwrap();
385 
386             let mut buf = Vec::with_capacity(512);
387 
388             match server.try_recv_buf(&mut buf) {
389                 Ok(n) => {
390                     assert_eq!(n, 11);
391                     assert_eq!(&buf[0..11], &b"hello world"[..]);
392                     break;
393                 }
394                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
395                 Err(e) => panic!("{:?}", e),
396             }
397         }
398     }
399 }
400 
401 #[tokio::test]
try_recv_buf_from()402 async fn try_recv_buf_from() {
403     // Create listener
404     let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
405     let saddr = server.local_addr().unwrap();
406 
407     // Create socket pair
408     let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
409     let caddr = client.local_addr().unwrap();
410 
411     for _ in 0..5 {
412         loop {
413             client.writable().await.unwrap();
414 
415             match client.try_send_to(b"hello world", saddr) {
416                 Ok(n) => {
417                     assert_eq!(n, 11);
418                     break;
419                 }
420                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
421                 Err(e) => panic!("{:?}", e),
422             }
423         }
424 
425         loop {
426             server.readable().await.unwrap();
427 
428             let mut buf = Vec::with_capacity(512);
429 
430             match server.try_recv_buf_from(&mut buf) {
431                 Ok((n, addr)) => {
432                     assert_eq!(n, 11);
433                     assert_eq!(addr, caddr);
434                     assert_eq!(&buf[0..11], &b"hello world"[..]);
435                     break;
436                 }
437                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
438                 Err(e) => panic!("{:?}", e),
439             }
440         }
441     }
442 }
443