1 use crate::io::util::DEFAULT_BUF_SIZE; 2 use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; 3 4 use pin_project_lite::pin_project; 5 use std::io; 6 use std::pin::Pin; 7 use std::task::{Context, Poll}; 8 use std::{cmp, fmt}; 9 10 pin_project! { 11 /// The `BufReader` struct adds buffering to any reader. 12 /// 13 /// It can be excessively inefficient to work directly with a [`AsyncRead`] 14 /// instance. A `BufReader` performs large, infrequent reads on the underlying 15 /// [`AsyncRead`] and maintains an in-memory buffer of the results. 16 /// 17 /// `BufReader` can improve the speed of programs that make *small* and 18 /// *repeated* read calls to the same file or network socket. It does not 19 /// help when reading very large amounts at once, or reading just one or a few 20 /// times. It also provides no advantage when reading from a source that is 21 /// already in memory, like a `Vec<u8>`. 22 /// 23 /// When the `BufReader` is dropped, the contents of its buffer will be 24 /// discarded. Creating multiple instances of a `BufReader` on the same 25 /// stream can cause data loss. 26 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] 27 pub struct BufReader<R> { 28 #[pin] 29 pub(super) inner: R, 30 pub(super) buf: Box<[u8]>, 31 pub(super) pos: usize, 32 pub(super) cap: usize, 33 } 34 } 35 36 impl<R: AsyncRead> BufReader<R> { 37 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, 38 /// but may change in the future. new(inner: R) -> Self39 pub fn new(inner: R) -> Self { 40 Self::with_capacity(DEFAULT_BUF_SIZE, inner) 41 } 42 43 /// Creates a new `BufReader` with the specified buffer capacity. with_capacity(capacity: usize, inner: R) -> Self44 pub fn with_capacity(capacity: usize, inner: R) -> Self { 45 let buffer = vec![0; capacity]; 46 Self { 47 inner, 48 buf: buffer.into_boxed_slice(), 49 pos: 0, 50 cap: 0, 51 } 52 } 53 54 /// Gets a reference to the underlying reader. 55 /// 56 /// It is inadvisable to directly read from the underlying reader. get_ref(&self) -> &R57 pub fn get_ref(&self) -> &R { 58 &self.inner 59 } 60 61 /// Gets a mutable reference to the underlying reader. 62 /// 63 /// It is inadvisable to directly read from the underlying reader. get_mut(&mut self) -> &mut R64 pub fn get_mut(&mut self) -> &mut R { 65 &mut self.inner 66 } 67 68 /// Gets a pinned mutable reference to the underlying reader. 69 /// 70 /// It is inadvisable to directly read from the underlying reader. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>71 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { 72 self.project().inner 73 } 74 75 /// Consumes this `BufReader`, returning the underlying reader. 76 /// 77 /// Note that any leftover data in the internal buffer is lost. into_inner(self) -> R78 pub fn into_inner(self) -> R { 79 self.inner 80 } 81 82 /// Returns a reference to the internally buffered data. 83 /// 84 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. buffer(&self) -> &[u8]85 pub fn buffer(&self) -> &[u8] { 86 &self.buf[self.pos..self.cap] 87 } 88 89 /// Invalidates all data in the internal buffer. 90 #[inline] discard_buffer(self: Pin<&mut Self>)91 fn discard_buffer(self: Pin<&mut Self>) { 92 let me = self.project(); 93 *me.pos = 0; 94 *me.cap = 0; 95 } 96 } 97 98 impl<R: AsyncRead> AsyncRead for BufReader<R> { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>99 fn poll_read( 100 mut self: Pin<&mut Self>, 101 cx: &mut Context<'_>, 102 buf: &mut ReadBuf<'_>, 103 ) -> Poll<io::Result<()>> { 104 // If we don't have any buffered data and we're doing a massive read 105 // (larger than our internal buffer), bypass our internal buffer 106 // entirely. 107 if self.pos == self.cap && buf.remaining() >= self.buf.len() { 108 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf)); 109 self.discard_buffer(); 110 return Poll::Ready(res); 111 } 112 let rem = ready!(self.as_mut().poll_fill_buf(cx))?; 113 let amt = std::cmp::min(rem.len(), buf.remaining()); 114 buf.put_slice(&rem[..amt]); 115 self.consume(amt); 116 Poll::Ready(Ok(())) 117 } 118 } 119 120 impl<R: AsyncRead> AsyncBufRead for BufReader<R> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>121 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 122 let me = self.project(); 123 124 // If we've reached the end of our internal buffer then we need to fetch 125 // some more data from the underlying reader. 126 // Branch using `>=` instead of the more correct `==` 127 // to tell the compiler that the pos..cap slice is always valid. 128 if *me.pos >= *me.cap { 129 debug_assert!(*me.pos == *me.cap); 130 let mut buf = ReadBuf::new(me.buf); 131 ready!(me.inner.poll_read(cx, &mut buf))?; 132 *me.cap = buf.filled().len(); 133 *me.pos = 0; 134 } 135 Poll::Ready(Ok(&me.buf[*me.pos..*me.cap])) 136 } 137 consume(self: Pin<&mut Self>, amt: usize)138 fn consume(self: Pin<&mut Self>, amt: usize) { 139 let me = self.project(); 140 *me.pos = cmp::min(*me.pos + amt, *me.cap); 141 } 142 } 143 144 impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>145 fn poll_write( 146 self: Pin<&mut Self>, 147 cx: &mut Context<'_>, 148 buf: &[u8], 149 ) -> Poll<io::Result<usize>> { 150 self.get_pin_mut().poll_write(cx, buf) 151 } 152 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>153 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 154 self.get_pin_mut().poll_flush(cx) 155 } 156 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>157 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 158 self.get_pin_mut().poll_shutdown(cx) 159 } 160 } 161 162 impl<R: fmt::Debug> fmt::Debug for BufReader<R> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 164 f.debug_struct("BufReader") 165 .field("reader", &self.inner) 166 .field( 167 "buffer", 168 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()), 169 ) 170 .finish() 171 } 172 } 173 174 #[cfg(test)] 175 mod tests { 176 use super::*; 177 178 #[test] assert_unpin()179 fn assert_unpin() { 180 crate::is_unpin::<BufReader<()>>(); 181 } 182 } 183