1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use futures::future::poll_fn;
5 use std::io;
6 use std::sync::Arc;
7 use tokio::{io::ReadBuf, net::UdpSocket};
8
9 const MSG: &[u8] = b"hello";
10 const MSG_LEN: usize = MSG.len();
11
12 #[tokio::test]
send_recv() -> std::io::Result<()>13 async fn send_recv() -> std::io::Result<()> {
14 let sender = UdpSocket::bind("127.0.0.1:0").await?;
15 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
16
17 sender.connect(receiver.local_addr()?).await?;
18 receiver.connect(sender.local_addr()?).await?;
19
20 sender.send(MSG).await?;
21
22 let mut recv_buf = [0u8; 32];
23 let len = receiver.recv(&mut recv_buf[..]).await?;
24
25 assert_eq!(&recv_buf[..len], MSG);
26 Ok(())
27 }
28
29 #[tokio::test]
send_recv_poll() -> std::io::Result<()>30 async fn send_recv_poll() -> std::io::Result<()> {
31 let sender = UdpSocket::bind("127.0.0.1:0").await?;
32 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
33
34 sender.connect(receiver.local_addr()?).await?;
35 receiver.connect(sender.local_addr()?).await?;
36
37 poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
38
39 let mut recv_buf = [0u8; 32];
40 let mut read = ReadBuf::new(&mut recv_buf);
41 let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
42
43 assert_eq!(read.filled(), MSG);
44 Ok(())
45 }
46
47 #[tokio::test]
send_to_recv_from() -> std::io::Result<()>48 async fn send_to_recv_from() -> std::io::Result<()> {
49 let sender = UdpSocket::bind("127.0.0.1:0").await?;
50 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
51
52 let receiver_addr = receiver.local_addr()?;
53 sender.send_to(MSG, &receiver_addr).await?;
54
55 let mut recv_buf = [0u8; 32];
56 let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
57
58 assert_eq!(&recv_buf[..len], MSG);
59 assert_eq!(addr, sender.local_addr()?);
60 Ok(())
61 }
62
63 #[tokio::test]
send_to_recv_from_poll() -> std::io::Result<()>64 async fn send_to_recv_from_poll() -> std::io::Result<()> {
65 let sender = UdpSocket::bind("127.0.0.1:0").await?;
66 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
67
68 let receiver_addr = receiver.local_addr()?;
69 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
70
71 let mut recv_buf = [0u8; 32];
72 let mut read = ReadBuf::new(&mut recv_buf);
73 let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
74
75 assert_eq!(read.filled(), MSG);
76 assert_eq!(addr, sender.local_addr()?);
77 Ok(())
78 }
79
80 #[tokio::test]
send_to_peek_from() -> std::io::Result<()>81 async fn send_to_peek_from() -> std::io::Result<()> {
82 let sender = UdpSocket::bind("127.0.0.1:0").await?;
83 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
84
85 let receiver_addr = receiver.local_addr()?;
86 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
87
88 // peek
89 let mut recv_buf = [0u8; 32];
90 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
91 assert_eq!(&recv_buf[..n], MSG);
92 assert_eq!(addr, sender.local_addr()?);
93
94 // peek
95 let mut recv_buf = [0u8; 32];
96 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
97 assert_eq!(&recv_buf[..n], MSG);
98 assert_eq!(addr, sender.local_addr()?);
99
100 let mut recv_buf = [0u8; 32];
101 let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
102 assert_eq!(&recv_buf[..n], MSG);
103 assert_eq!(addr, sender.local_addr()?);
104
105 Ok(())
106 }
107
108 #[tokio::test]
send_to_peek_from_poll() -> std::io::Result<()>109 async fn send_to_peek_from_poll() -> std::io::Result<()> {
110 let sender = UdpSocket::bind("127.0.0.1:0").await?;
111 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
112
113 let receiver_addr = receiver.local_addr()?;
114 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
115
116 let mut recv_buf = [0u8; 32];
117 let mut read = ReadBuf::new(&mut recv_buf);
118 let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
119
120 assert_eq!(read.filled(), MSG);
121 assert_eq!(addr, sender.local_addr()?);
122
123 let mut recv_buf = [0u8; 32];
124 let mut read = ReadBuf::new(&mut recv_buf);
125 poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
126
127 assert_eq!(read.filled(), MSG);
128 let mut recv_buf = [0u8; 32];
129 let mut read = ReadBuf::new(&mut recv_buf);
130
131 poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
132 assert_eq!(read.filled(), MSG);
133 Ok(())
134 }
135
136 #[tokio::test]
split() -> std::io::Result<()>137 async fn split() -> std::io::Result<()> {
138 let socket = UdpSocket::bind("127.0.0.1:0").await?;
139 let s = Arc::new(socket);
140 let r = s.clone();
141
142 let addr = s.local_addr()?;
143 tokio::spawn(async move {
144 s.send_to(MSG, &addr).await.unwrap();
145 });
146 let mut recv_buf = [0u8; 32];
147 let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
148 assert_eq!(&recv_buf[..len], MSG);
149 Ok(())
150 }
151
152 #[tokio::test]
split_chan() -> std::io::Result<()>153 async fn split_chan() -> std::io::Result<()> {
154 // setup UdpSocket that will echo all sent items
155 let socket = UdpSocket::bind("127.0.0.1:0").await?;
156 let addr = socket.local_addr().unwrap();
157 let s = Arc::new(socket);
158 let r = s.clone();
159
160 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
161 tokio::spawn(async move {
162 while let Some((bytes, addr)) = rx.recv().await {
163 s.send_to(&bytes, &addr).await.unwrap();
164 }
165 });
166
167 tokio::spawn(async move {
168 let mut buf = [0u8; 32];
169 loop {
170 let (len, addr) = r.recv_from(&mut buf).await.unwrap();
171 tx.send((buf[..len].to_vec(), addr)).await.unwrap();
172 }
173 });
174
175 // test that we can send a value and get back some response
176 let sender = UdpSocket::bind("127.0.0.1:0").await?;
177 sender.send_to(MSG, addr).await?;
178 let mut recv_buf = [0u8; 32];
179 let (len, _) = sender.recv_from(&mut recv_buf).await?;
180 assert_eq!(&recv_buf[..len], MSG);
181 Ok(())
182 }
183
184 #[tokio::test]
split_chan_poll() -> std::io::Result<()>185 async fn split_chan_poll() -> std::io::Result<()> {
186 // setup UdpSocket that will echo all sent items
187 let socket = UdpSocket::bind("127.0.0.1:0").await?;
188 let addr = socket.local_addr().unwrap();
189 let s = Arc::new(socket);
190 let r = s.clone();
191
192 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
193 tokio::spawn(async move {
194 while let Some((bytes, addr)) = rx.recv().await {
195 poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
196 .await
197 .unwrap();
198 }
199 });
200
201 tokio::spawn(async move {
202 let mut recv_buf = [0u8; 32];
203 let mut read = ReadBuf::new(&mut recv_buf);
204 loop {
205 let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
206 tx.send((read.filled().to_vec(), addr)).await.unwrap();
207 }
208 });
209
210 // test that we can send a value and get back some response
211 let sender = UdpSocket::bind("127.0.0.1:0").await?;
212 poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
213
214 let mut recv_buf = [0u8; 32];
215 let mut read = ReadBuf::new(&mut recv_buf);
216 let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
217 assert_eq!(read.filled(), MSG);
218 Ok(())
219 }
220
221 // # Note
222 //
223 // This test is purposely written such that each time `sender` sends data on
224 // the socket, `receiver` awaits the data. On Unix, it would be okay waiting
225 // until the end of the test to receive all the data. On Windows, this would
226 // **not** be okay because it's resources are completion based (via IOCP).
227 // If data is sent and not yet received, attempting to send more data will
228 // result in `ErrorKind::WouldBlock` until the first operation completes.
229 #[tokio::test]
try_send_spawn()230 async fn try_send_spawn() {
231 const MSG2: &[u8] = b"world!";
232 const MSG2_LEN: usize = MSG2.len();
233
234 let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
235 let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
236
237 receiver
238 .connect(sender.local_addr().unwrap())
239 .await
240 .unwrap();
241
242 sender.writable().await.unwrap();
243
244 let sent = &sender
245 .try_send_to(MSG, receiver.local_addr().unwrap())
246 .unwrap();
247 assert_eq!(sent, &MSG_LEN);
248 let mut buf = [0u8; 32];
249 let mut received = receiver.recv(&mut buf[..]).await.unwrap();
250
251 sender
252 .connect(receiver.local_addr().unwrap())
253 .await
254 .unwrap();
255 let sent = &sender.try_send(MSG2).unwrap();
256 assert_eq!(sent, &MSG2_LEN);
257 received += receiver.recv(&mut buf[..]).await.unwrap();
258
259 std::thread::spawn(move || {
260 let sent = &sender.try_send(MSG).unwrap();
261 assert_eq!(sent, &MSG_LEN);
262 })
263 .join()
264 .unwrap();
265 received += receiver.recv(&mut buf[..]).await.unwrap();
266
267 assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
268 }
269
270 #[tokio::test]
try_send_recv()271 async fn try_send_recv() {
272 // Create listener
273 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
274
275 // Create socket pair
276 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
277
278 // Connect the two
279 client.connect(server.local_addr().unwrap()).await.unwrap();
280 server.connect(client.local_addr().unwrap()).await.unwrap();
281
282 for _ in 0..5 {
283 loop {
284 client.writable().await.unwrap();
285
286 match client.try_send(b"hello world") {
287 Ok(n) => {
288 assert_eq!(n, 11);
289 break;
290 }
291 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
292 Err(e) => panic!("{:?}", e),
293 }
294 }
295
296 loop {
297 server.readable().await.unwrap();
298
299 let mut buf = [0; 512];
300
301 match server.try_recv(&mut buf) {
302 Ok(n) => {
303 assert_eq!(n, 11);
304 assert_eq!(&buf[0..11], &b"hello world"[..]);
305 break;
306 }
307 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
308 Err(e) => panic!("{:?}", e),
309 }
310 }
311 }
312 }
313
314 #[tokio::test]
try_send_to_recv_from()315 async fn try_send_to_recv_from() {
316 // Create listener
317 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
318 let saddr = server.local_addr().unwrap();
319
320 // Create socket pair
321 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
322 let caddr = client.local_addr().unwrap();
323
324 for _ in 0..5 {
325 loop {
326 client.writable().await.unwrap();
327
328 match client.try_send_to(b"hello world", saddr) {
329 Ok(n) => {
330 assert_eq!(n, 11);
331 break;
332 }
333 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
334 Err(e) => panic!("{:?}", e),
335 }
336 }
337
338 loop {
339 server.readable().await.unwrap();
340
341 let mut buf = [0; 512];
342
343 match server.try_recv_from(&mut buf) {
344 Ok((n, addr)) => {
345 assert_eq!(n, 11);
346 assert_eq!(addr, caddr);
347 assert_eq!(&buf[0..11], &b"hello world"[..]);
348 break;
349 }
350 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
351 Err(e) => panic!("{:?}", e),
352 }
353 }
354 }
355 }
356
357 #[tokio::test]
try_recv_buf()358 async fn try_recv_buf() {
359 // Create listener
360 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
361
362 // Create socket pair
363 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
364
365 // Connect the two
366 client.connect(server.local_addr().unwrap()).await.unwrap();
367 server.connect(client.local_addr().unwrap()).await.unwrap();
368
369 for _ in 0..5 {
370 loop {
371 client.writable().await.unwrap();
372
373 match client.try_send(b"hello world") {
374 Ok(n) => {
375 assert_eq!(n, 11);
376 break;
377 }
378 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
379 Err(e) => panic!("{:?}", e),
380 }
381 }
382
383 loop {
384 server.readable().await.unwrap();
385
386 let mut buf = Vec::with_capacity(512);
387
388 match server.try_recv_buf(&mut buf) {
389 Ok(n) => {
390 assert_eq!(n, 11);
391 assert_eq!(&buf[0..11], &b"hello world"[..]);
392 break;
393 }
394 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
395 Err(e) => panic!("{:?}", e),
396 }
397 }
398 }
399 }
400
401 #[tokio::test]
try_recv_buf_from()402 async fn try_recv_buf_from() {
403 // Create listener
404 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
405 let saddr = server.local_addr().unwrap();
406
407 // Create socket pair
408 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
409 let caddr = client.local_addr().unwrap();
410
411 for _ in 0..5 {
412 loop {
413 client.writable().await.unwrap();
414
415 match client.try_send_to(b"hello world", saddr) {
416 Ok(n) => {
417 assert_eq!(n, 11);
418 break;
419 }
420 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
421 Err(e) => panic!("{:?}", e),
422 }
423 }
424
425 loop {
426 server.readable().await.unwrap();
427
428 let mut buf = Vec::with_capacity(512);
429
430 match server.try_recv_buf_from(&mut buf) {
431 Ok((n, addr)) => {
432 assert_eq!(n, 11);
433 assert_eq!(addr, caddr);
434 assert_eq!(&buf[0..11], &b"hello world"[..]);
435 break;
436 }
437 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
438 Err(e) => panic!("{:?}", e),
439 }
440 }
441 }
442 }
443