1 use crate::stream::TryStreamExt; 2 use core::pin::Pin; 3 use futures_core::ready; 4 use futures_core::stream::TryStream; 5 use futures_core::task::{Context, Poll}; 6 use futures_io::{AsyncRead, AsyncWrite, AsyncBufRead}; 7 use std::cmp; 8 use std::io::{Error, Result}; 9 10 /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. 11 #[derive(Debug)] 12 #[must_use = "readers do nothing unless polled"] 13 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 14 pub struct IntoAsyncRead<St> 15 where 16 St: TryStream<Error = Error> + Unpin, 17 St::Ok: AsRef<[u8]>, 18 { 19 stream: St, 20 state: ReadState<St::Ok>, 21 } 22 23 impl<St> Unpin for IntoAsyncRead<St> 24 where 25 St: TryStream<Error = Error> + Unpin, 26 St::Ok: AsRef<[u8]>, 27 { 28 } 29 30 #[derive(Debug)] 31 enum ReadState<T: AsRef<[u8]>> { 32 Ready { chunk: T, chunk_start: usize }, 33 PendingChunk, 34 Eof, 35 } 36 37 impl<St> IntoAsyncRead<St> 38 where 39 St: TryStream<Error = Error> + Unpin, 40 St::Ok: AsRef<[u8]>, 41 { new(stream: St) -> Self42 pub(super) fn new(stream: St) -> Self { 43 Self { 44 stream, 45 state: ReadState::PendingChunk, 46 } 47 } 48 } 49 50 impl<St> AsyncRead for IntoAsyncRead<St> 51 where 52 St: TryStream<Error = Error> + Unpin, 53 St::Ok: AsRef<[u8]>, 54 { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>55 fn poll_read( 56 mut self: Pin<&mut Self>, 57 cx: &mut Context<'_>, 58 buf: &mut [u8], 59 ) -> Poll<Result<usize>> { 60 loop { 61 match &mut self.state { 62 ReadState::Ready { chunk, chunk_start } => { 63 let chunk = chunk.as_ref(); 64 let len = cmp::min(buf.len(), chunk.len() - *chunk_start); 65 66 buf[..len].copy_from_slice( 67 &chunk[*chunk_start..*chunk_start + len], 68 ); 69 *chunk_start += len; 70 71 if chunk.len() == *chunk_start { 72 self.state = ReadState::PendingChunk; 73 } 74 75 return Poll::Ready(Ok(len)); 76 } 77 ReadState::PendingChunk => { 78 match ready!(self.stream.try_poll_next_unpin(cx)) { 79 Some(Ok(chunk)) => { 80 if !chunk.as_ref().is_empty() { 81 self.state = ReadState::Ready { 82 chunk, 83 chunk_start: 0, 84 }; 85 } 86 } 87 Some(Err(err)) => { 88 self.state = ReadState::Eof; 89 return Poll::Ready(Err(err)); 90 } 91 None => { 92 self.state = ReadState::Eof; 93 return Poll::Ready(Ok(0)); 94 } 95 } 96 } 97 ReadState::Eof => { 98 return Poll::Ready(Ok(0)); 99 } 100 } 101 } 102 } 103 } 104 105 impl<St> AsyncWrite for IntoAsyncRead<St> 106 where 107 St: TryStream<Error = Error> + AsyncWrite + Unpin, 108 St::Ok: AsRef<[u8]>, 109 { poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8] ) -> Poll<Result<usize>>110 fn poll_write( 111 mut self: Pin<&mut Self>, 112 cx: &mut Context<'_>, 113 buf: &[u8] 114 ) -> Poll<Result<usize>> { 115 Pin::new( &mut self.stream ).poll_write( cx, buf ) 116 } 117 poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<()>>118 fn poll_flush( 119 mut self: Pin<&mut Self>, 120 cx: &mut Context<'_> 121 ) -> Poll<Result<()>> { 122 Pin::new( &mut self.stream ).poll_flush( cx ) 123 } 124 poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<()>>125 fn poll_close( 126 mut self: Pin<&mut Self>, 127 cx: &mut Context<'_> 128 ) -> Poll<Result<()>> { 129 Pin::new( &mut self.stream ).poll_close( cx ) 130 } 131 } 132 133 impl<St> AsyncBufRead for IntoAsyncRead<St> 134 where 135 St: TryStream<Error = Error> + Unpin, 136 St::Ok: AsRef<[u8]>, 137 { poll_fill_buf( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<&[u8]>>138 fn poll_fill_buf( 139 mut self: Pin<&mut Self>, 140 cx: &mut Context<'_>, 141 ) -> Poll<Result<&[u8]>> { 142 while let ReadState::PendingChunk = self.state { 143 match ready!(self.stream.try_poll_next_unpin(cx)) { 144 Some(Ok(chunk)) => { 145 if !chunk.as_ref().is_empty() { 146 self.state = ReadState::Ready { 147 chunk, 148 chunk_start: 0, 149 }; 150 } 151 } 152 Some(Err(err)) => { 153 self.state = ReadState::Eof; 154 return Poll::Ready(Err(err)); 155 } 156 None => { 157 self.state = ReadState::Eof; 158 return Poll::Ready(Ok(&[])); 159 } 160 } 161 } 162 163 if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { 164 let chunk = chunk.as_ref(); 165 return Poll::Ready(Ok(&chunk[chunk_start..])); 166 } 167 168 // To get to this point we must be in ReadState::Eof 169 Poll::Ready(Ok(&[])) 170 } 171 consume( mut self: Pin<&mut Self>, amount: usize, )172 fn consume( 173 mut self: Pin<&mut Self>, 174 amount: usize, 175 ) { 176 // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 177 if amount == 0 { return } 178 if let ReadState::Ready { chunk, chunk_start } = &mut self.state { 179 *chunk_start += amount; 180 debug_assert!(*chunk_start <= chunk.as_ref().len()); 181 if *chunk_start >= chunk.as_ref().len() { 182 self.state = ReadState::PendingChunk; 183 } 184 } else { 185 debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); 186 } 187 } 188 } 189