1 use crate::io::util::DEFAULT_BUF_SIZE;
2 use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
3 
4 use pin_project_lite::pin_project;
5 use std::io;
6 use std::pin::Pin;
7 use std::task::{Context, Poll};
8 use std::{cmp, fmt};
9 
10 pin_project! {
11     /// The `BufReader` struct adds buffering to any reader.
12     ///
13     /// It can be excessively inefficient to work directly with a [`AsyncRead`]
14     /// instance. A `BufReader` performs large, infrequent reads on the underlying
15     /// [`AsyncRead`] and maintains an in-memory buffer of the results.
16     ///
17     /// `BufReader` can improve the speed of programs that make *small* and
18     /// *repeated* read calls to the same file or network socket. It does not
19     /// help when reading very large amounts at once, or reading just one or a few
20     /// times. It also provides no advantage when reading from a source that is
21     /// already in memory, like a `Vec<u8>`.
22     ///
23     /// When the `BufReader` is dropped, the contents of its buffer will be
24     /// discarded. Creating multiple instances of a `BufReader` on the same
25     /// stream can cause data loss.
26     #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
27     pub struct BufReader<R> {
28         #[pin]
29         pub(super) inner: R,
30         pub(super) buf: Box<[u8]>,
31         pub(super) pos: usize,
32         pub(super) cap: usize,
33     }
34 }
35 
36 impl<R: AsyncRead> BufReader<R> {
37     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
38     /// but may change in the future.
new(inner: R) -> Self39     pub fn new(inner: R) -> Self {
40         Self::with_capacity(DEFAULT_BUF_SIZE, inner)
41     }
42 
43     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self44     pub fn with_capacity(capacity: usize, inner: R) -> Self {
45         let buffer = vec![0; capacity];
46         Self {
47             inner,
48             buf: buffer.into_boxed_slice(),
49             pos: 0,
50             cap: 0,
51         }
52     }
53 
54     /// Gets a reference to the underlying reader.
55     ///
56     /// It is inadvisable to directly read from the underlying reader.
get_ref(&self) -> &R57     pub fn get_ref(&self) -> &R {
58         &self.inner
59     }
60 
61     /// Gets a mutable reference to the underlying reader.
62     ///
63     /// It is inadvisable to directly read from the underlying reader.
get_mut(&mut self) -> &mut R64     pub fn get_mut(&mut self) -> &mut R {
65         &mut self.inner
66     }
67 
68     /// Gets a pinned mutable reference to the underlying reader.
69     ///
70     /// It is inadvisable to directly read from the underlying reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>71     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
72         self.project().inner
73     }
74 
75     /// Consumes this `BufReader`, returning the underlying reader.
76     ///
77     /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> R78     pub fn into_inner(self) -> R {
79         self.inner
80     }
81 
82     /// Returns a reference to the internally buffered data.
83     ///
84     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]85     pub fn buffer(&self) -> &[u8] {
86         &self.buf[self.pos..self.cap]
87     }
88 
89     /// Invalidates all data in the internal buffer.
90     #[inline]
discard_buffer(self: Pin<&mut Self>)91     fn discard_buffer(self: Pin<&mut Self>) {
92         let me = self.project();
93         *me.pos = 0;
94         *me.cap = 0;
95     }
96 }
97 
98 impl<R: AsyncRead> AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>99     fn poll_read(
100         mut self: Pin<&mut Self>,
101         cx: &mut Context<'_>,
102         buf: &mut ReadBuf<'_>,
103     ) -> Poll<io::Result<()>> {
104         // If we don't have any buffered data and we're doing a massive read
105         // (larger than our internal buffer), bypass our internal buffer
106         // entirely.
107         if self.pos == self.cap && buf.remaining() >= self.buf.len() {
108             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
109             self.discard_buffer();
110             return Poll::Ready(res);
111         }
112         let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
113         let amt = std::cmp::min(rem.len(), buf.remaining());
114         buf.put_slice(&rem[..amt]);
115         self.consume(amt);
116         Poll::Ready(Ok(()))
117     }
118 }
119 
120 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>121     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
122         let me = self.project();
123 
124         // If we've reached the end of our internal buffer then we need to fetch
125         // some more data from the underlying reader.
126         // Branch using `>=` instead of the more correct `==`
127         // to tell the compiler that the pos..cap slice is always valid.
128         if *me.pos >= *me.cap {
129             debug_assert!(*me.pos == *me.cap);
130             let mut buf = ReadBuf::new(me.buf);
131             ready!(me.inner.poll_read(cx, &mut buf))?;
132             *me.cap = buf.filled().len();
133             *me.pos = 0;
134         }
135         Poll::Ready(Ok(&me.buf[*me.pos..*me.cap]))
136     }
137 
consume(self: Pin<&mut Self>, amt: usize)138     fn consume(self: Pin<&mut Self>, amt: usize) {
139         let me = self.project();
140         *me.pos = cmp::min(*me.pos + amt, *me.cap);
141     }
142 }
143 
144 impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>145     fn poll_write(
146         self: Pin<&mut Self>,
147         cx: &mut Context<'_>,
148         buf: &[u8],
149     ) -> Poll<io::Result<usize>> {
150         self.get_pin_mut().poll_write(cx, buf)
151     }
152 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>153     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
154         self.get_pin_mut().poll_flush(cx)
155     }
156 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>157     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
158         self.get_pin_mut().poll_shutdown(cx)
159     }
160 }
161 
162 impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result163     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164         f.debug_struct("BufReader")
165             .field("reader", &self.inner)
166             .field(
167                 "buffer",
168                 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
169             )
170             .finish()
171     }
172 }
173 
174 #[cfg(test)]
175 mod tests {
176     use super::*;
177 
178     #[test]
assert_unpin()179     fn assert_unpin() {
180         crate::is_unpin::<BufReader<()>>();
181     }
182 }
183