1 use crate::stream::TryStreamExt;
2 use core::pin::Pin;
3 use futures_core::ready;
4 use futures_core::stream::TryStream;
5 use futures_core::task::{Context, Poll};
6 use futures_io::{AsyncRead, AsyncWrite, AsyncBufRead};
7 use std::cmp;
8 use std::io::{Error, Result};
9 
10 /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
11 #[derive(Debug)]
12 #[must_use = "readers do nothing unless polled"]
13 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
14 pub struct IntoAsyncRead<St>
15 where
16     St: TryStream<Error = Error> + Unpin,
17     St::Ok: AsRef<[u8]>,
18 {
19     stream: St,
20     state: ReadState<St::Ok>,
21 }
22 
23 impl<St> Unpin for IntoAsyncRead<St>
24 where
25     St: TryStream<Error = Error> + Unpin,
26     St::Ok: AsRef<[u8]>,
27 {
28 }
29 
30 #[derive(Debug)]
31 enum ReadState<T: AsRef<[u8]>> {
32     Ready { chunk: T, chunk_start: usize },
33     PendingChunk,
34     Eof,
35 }
36 
37 impl<St> IntoAsyncRead<St>
38 where
39     St: TryStream<Error = Error> + Unpin,
40     St::Ok: AsRef<[u8]>,
41 {
new(stream: St) -> Self42     pub(super) fn new(stream: St) -> Self {
43         Self {
44             stream,
45             state: ReadState::PendingChunk,
46         }
47     }
48 }
49 
50 impl<St> AsyncRead for IntoAsyncRead<St>
51 where
52     St: TryStream<Error = Error> + Unpin,
53     St::Ok: AsRef<[u8]>,
54 {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>55     fn poll_read(
56         mut self: Pin<&mut Self>,
57         cx: &mut Context<'_>,
58         buf: &mut [u8],
59     ) -> Poll<Result<usize>> {
60         loop {
61             match &mut self.state {
62                 ReadState::Ready { chunk, chunk_start } => {
63                     let chunk = chunk.as_ref();
64                     let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
65 
66                     buf[..len].copy_from_slice(
67                         &chunk[*chunk_start..*chunk_start + len],
68                     );
69                     *chunk_start += len;
70 
71                     if chunk.len() == *chunk_start {
72                         self.state = ReadState::PendingChunk;
73                     }
74 
75                     return Poll::Ready(Ok(len));
76                 }
77                 ReadState::PendingChunk => {
78                     match ready!(self.stream.try_poll_next_unpin(cx)) {
79                         Some(Ok(chunk)) => {
80                             if !chunk.as_ref().is_empty() {
81                                 self.state = ReadState::Ready {
82                                     chunk,
83                                     chunk_start: 0,
84                                 };
85                             }
86                         }
87                         Some(Err(err)) => {
88                             self.state = ReadState::Eof;
89                             return Poll::Ready(Err(err));
90                         }
91                         None => {
92                             self.state = ReadState::Eof;
93                             return Poll::Ready(Ok(0));
94                         }
95                     }
96                 }
97                 ReadState::Eof => {
98                     return Poll::Ready(Ok(0));
99                 }
100             }
101         }
102     }
103 }
104 
105 impl<St> AsyncWrite for IntoAsyncRead<St>
106 where
107     St: TryStream<Error = Error> + AsyncWrite + Unpin,
108     St::Ok: AsRef<[u8]>,
109 {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8] ) -> Poll<Result<usize>>110     fn poll_write(
111         mut self: Pin<&mut Self>,
112         cx: &mut Context<'_>,
113         buf: &[u8]
114     ) -> Poll<Result<usize>> {
115         Pin::new( &mut self.stream ).poll_write( cx, buf )
116     }
117 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<()>>118     fn poll_flush(
119         mut self: Pin<&mut Self>,
120         cx: &mut Context<'_>
121     ) -> Poll<Result<()>> {
122         Pin::new( &mut self.stream ).poll_flush( cx )
123     }
124 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<()>>125     fn poll_close(
126         mut self: Pin<&mut Self>,
127         cx: &mut Context<'_>
128     ) -> Poll<Result<()>> {
129         Pin::new( &mut self.stream ).poll_close( cx )
130     }
131 }
132 
133 impl<St> AsyncBufRead for IntoAsyncRead<St>
134 where
135     St: TryStream<Error = Error> + Unpin,
136     St::Ok: AsRef<[u8]>,
137 {
poll_fill_buf( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<&[u8]>>138     fn poll_fill_buf(
139         mut self: Pin<&mut Self>,
140         cx: &mut Context<'_>,
141     ) -> Poll<Result<&[u8]>> {
142         while let ReadState::PendingChunk = self.state {
143             match ready!(self.stream.try_poll_next_unpin(cx)) {
144                 Some(Ok(chunk)) => {
145                     if !chunk.as_ref().is_empty() {
146                         self.state = ReadState::Ready {
147                             chunk,
148                             chunk_start: 0,
149                         };
150                     }
151                 }
152                 Some(Err(err)) => {
153                     self.state = ReadState::Eof;
154                     return Poll::Ready(Err(err));
155                 }
156                 None => {
157                     self.state = ReadState::Eof;
158                     return Poll::Ready(Ok(&[]));
159                 }
160             }
161         }
162 
163         if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
164             let chunk = chunk.as_ref();
165             return Poll::Ready(Ok(&chunk[chunk_start..]));
166         }
167 
168         // To get to this point we must be in ReadState::Eof
169         Poll::Ready(Ok(&[]))
170     }
171 
consume( mut self: Pin<&mut Self>, amount: usize, )172     fn consume(
173         mut self: Pin<&mut Self>,
174         amount: usize,
175     ) {
176          // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
177         if amount == 0 { return }
178         if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
179             *chunk_start += amount;
180             debug_assert!(*chunk_start <= chunk.as_ref().len());
181             if *chunk_start >= chunk.as_ref().len() {
182                 self.state = ReadState::PendingChunk;
183             }
184         } else {
185             debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
186         }
187     }
188 }
189