1 use futures_core::ready; 2 use futures_core::future::Future; 3 use futures_core::task::{Context, Poll}; 4 use futures_io::AsyncWrite; 5 use futures_io::IoSlice; 6 use std::io; 7 use std::mem; 8 use std::pin::Pin; 9 10 /// Future for the 11 /// [`write_all_vectored`](super::AsyncWriteExt::write_all_vectored) method. 12 #[derive(Debug)] 13 #[must_use = "futures do nothing unless you `.await` or poll them"] 14 pub struct WriteAllVectored<'a, W: ?Sized + Unpin> { 15 writer: &'a mut W, 16 bufs: &'a mut [IoSlice<'a>], 17 } 18 19 impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {} 20 21 impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> { new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self22 pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self { 23 Self { writer, bufs: IoSlice::advance(bufs, 0) } 24 } 25 } 26 27 impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVectored<'_, W> { 28 type Output = io::Result<()>; 29 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>30 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 31 let this = &mut *self; 32 while !this.bufs.is_empty() { 33 let n = ready!(Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs))?; 34 if n == 0 { 35 return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); 36 } else { 37 this.bufs = IoSlice::advance(mem::take(&mut this.bufs), n); 38 } 39 } 40 41 Poll::Ready(Ok(())) 42 } 43 } 44 45 #[cfg(test)] 46 mod tests { 47 use std::cmp::min; 48 use std::future::Future; 49 use std::io; 50 use std::pin::Pin; 51 use std::task::{Context, Poll}; 52 53 use crate::io::{AsyncWrite, AsyncWriteExt, IoSlice}; 54 use crate::task::noop_waker; 55 56 /// Create a new writer that reads from at most `n_bufs` and reads 57 /// `per_call` bytes (in total) per call to write. test_writer(n_bufs: usize, per_call: usize) -> TestWriter58 fn test_writer(n_bufs: usize, per_call: usize) -> TestWriter { 59 TestWriter { 60 n_bufs, 61 per_call, 62 written: Vec::new(), 63 } 64 } 65 66 // TODO: maybe move this the future-test crate? 67 struct TestWriter { 68 n_bufs: usize, 69 per_call: usize, 70 written: Vec<u8>, 71 } 72 73 impl AsyncWrite for TestWriter { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>74 fn poll_write( 75 self: Pin<&mut Self>, 76 cx: &mut Context<'_>, 77 buf: &[u8], 78 ) -> Poll<io::Result<usize>> { 79 self.poll_write_vectored(cx, &[IoSlice::new(buf)]) 80 } 81 poll_write_vectored( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>82 fn poll_write_vectored( 83 mut self: Pin<&mut Self>, 84 _cx: &mut Context<'_>, 85 bufs: &[IoSlice<'_>], 86 ) -> Poll<io::Result<usize>> { 87 let mut left = self.per_call; 88 let mut written = 0; 89 for buf in bufs.iter().take(self.n_bufs) { 90 let n = min(left, buf.len()); 91 self.written.extend_from_slice(&buf[0..n]); 92 left -= n; 93 written += n; 94 } 95 Poll::Ready(Ok(written)) 96 } 97 poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>98 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 99 Poll::Ready(Ok(())) 100 } 101 poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>102 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 103 Poll::Ready(Ok(())) 104 } 105 } 106 107 // TODO: maybe move this the future-test crate? 108 macro_rules! assert_poll_ok { 109 ($e:expr, $expected:expr) => { 110 let expected = $expected; 111 match $e { 112 Poll::Ready(Ok(ok)) if ok == expected => {} 113 got => panic!( 114 "unexpected result, got: {:?}, wanted: Ready(Ok({:?}))", 115 got, expected 116 ), 117 } 118 }; 119 } 120 121 #[test] test_writer_read_from_one_buf()122 fn test_writer_read_from_one_buf() { 123 let waker = noop_waker(); 124 let mut cx = Context::from_waker(&waker); 125 126 let mut dst = test_writer(1, 2); 127 let mut dst = Pin::new(&mut dst); 128 129 assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[]), 0); 130 assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, &[]), 0); 131 132 // Read at most 2 bytes. 133 assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[1, 1, 1]), 2); 134 let bufs = &[IoSlice::new(&[2, 2, 2])]; 135 assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 2); 136 137 // Only read from first buf. 138 let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4, 4])]; 139 assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 1); 140 141 assert_eq!(dst.written, &[1, 1, 2, 2, 3]); 142 } 143 144 #[test] test_writer_read_from_multiple_bufs()145 fn test_writer_read_from_multiple_bufs() { 146 let waker = noop_waker(); 147 let mut cx = Context::from_waker(&waker); 148 149 let mut dst = test_writer(3, 3); 150 let mut dst = Pin::new(&mut dst); 151 152 // Read at most 3 bytes from two buffers. 153 let bufs = &[IoSlice::new(&[1]), IoSlice::new(&[2, 2, 2])]; 154 assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); 155 156 // Read at most 3 bytes from three buffers. 157 let bufs = &[ 158 IoSlice::new(&[3]), 159 IoSlice::new(&[4]), 160 IoSlice::new(&[5, 5]), 161 ]; 162 assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); 163 164 assert_eq!(dst.written, &[1, 2, 2, 3, 4, 5]); 165 } 166 167 #[test] test_write_all_vectored()168 fn test_write_all_vectored() { 169 let waker = noop_waker(); 170 let mut cx = Context::from_waker(&waker); 171 172 #[rustfmt::skip] // Becomes unreadable otherwise. 173 let tests: Vec<(_, &'static [u8])> = vec![ 174 (vec![], &[]), 175 (vec![IoSlice::new(&[]), IoSlice::new(&[])], &[]), 176 (vec![IoSlice::new(&[1])], &[1]), 177 (vec![IoSlice::new(&[1, 2])], &[1, 2]), 178 (vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]), 179 (vec![IoSlice::new(&[1, 2, 3, 4])], &[1, 2, 3, 4]), 180 (vec![IoSlice::new(&[1, 2, 3, 4, 5])], &[1, 2, 3, 4, 5]), 181 (vec![IoSlice::new(&[1]), IoSlice::new(&[2])], &[1, 2]), 182 (vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2])], &[1, 1, 2, 2]), 183 (vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2])], &[1, 1, 1, 2, 2, 2]), 184 (vec![IoSlice::new(&[1, 1, 1, 1]), IoSlice::new(&[2, 2, 2, 2])], &[1, 1, 1, 1, 2, 2, 2, 2]), 185 (vec![IoSlice::new(&[1]), IoSlice::new(&[2]), IoSlice::new(&[3])], &[1, 2, 3]), 186 (vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2]), IoSlice::new(&[3, 3])], &[1, 1, 2, 2, 3, 3]), 187 (vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]), 188 ]; 189 190 for (mut input, wanted) in tests { 191 let mut dst = test_writer(2, 2); 192 { 193 let mut future = dst.write_all_vectored(&mut *input); 194 match Pin::new(&mut future).poll(&mut cx) { 195 Poll::Ready(Ok(())) => {} 196 other => panic!("unexpected result polling future: {:?}", other), 197 } 198 } 199 assert_eq!(&*dst.written, &*wanted); 200 } 201 } 202 } 203