1 use crate::stream::{Fuse, StreamExt}; 2 use core::fmt; 3 use core::pin::Pin; 4 use futures_core::future::{FusedFuture, Future}; 5 use futures_core::ready; 6 use futures_core::stream::{FusedStream, Stream}; 7 use futures_core::task::{Context, Poll}; 8 #[cfg(feature = "sink")] 9 use futures_sink::Sink; 10 use pin_project_lite::pin_project; 11 12 pin_project! { 13 /// A `Stream` that implements a `peek` method. 14 /// 15 /// The `peek` method can be used to retrieve a reference 16 /// to the next `Stream::Item` if available. A subsequent 17 /// call to `poll` will return the owned item. 18 #[derive(Debug)] 19 #[must_use = "streams do nothing unless polled"] 20 pub struct Peekable<St: Stream> { 21 #[pin] 22 stream: Fuse<St>, 23 peeked: Option<St::Item>, 24 } 25 } 26 27 impl<St: Stream> Peekable<St> { new(stream: St) -> Self28 pub(super) fn new(stream: St) -> Self { 29 Self { 30 stream: stream.fuse(), 31 peeked: None, 32 } 33 } 34 35 delegate_access_inner!(stream, St, (.)); 36 37 /// Produces a `Peek` future which retrieves a reference to the next item 38 /// in the stream, or `None` if the underlying stream terminates. peek(self: Pin<&mut Self>) -> Peek<'_, St>39 pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { 40 Peek { inner: Some(self) } 41 } 42 43 /// Peek retrieves a reference to the next item in the stream. 44 /// 45 /// This method polls the underlying stream and return either a reference 46 /// to the next item if the stream is ready or passes through any errors. poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>>47 pub fn poll_peek( 48 self: Pin<&mut Self>, 49 cx: &mut Context<'_>, 50 ) -> Poll<Option<&St::Item>> { 51 let mut this = self.project(); 52 53 Poll::Ready(loop { 54 if this.peeked.is_some() { 55 break this.peeked.as_ref(); 56 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { 57 *this.peeked = Some(item); 58 } else { 59 break None; 60 } 61 }) 62 } 63 } 64 65 impl<St: Stream> FusedStream for Peekable<St> { is_terminated(&self) -> bool66 fn is_terminated(&self) -> bool { 67 self.peeked.is_none() && self.stream.is_terminated() 68 } 69 } 70 71 impl<S: Stream> Stream for Peekable<S> { 72 type Item = S::Item; 73 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>74 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 75 let this = self.project(); 76 if let Some(item) = this.peeked.take() { 77 return Poll::Ready(Some(item)); 78 } 79 this.stream.poll_next(cx) 80 } 81 size_hint(&self) -> (usize, Option<usize>)82 fn size_hint(&self) -> (usize, Option<usize>) { 83 let peek_len = if self.peeked.is_some() { 1 } else { 0 }; 84 let (lower, upper) = self.stream.size_hint(); 85 let lower = lower.saturating_add(peek_len); 86 let upper = match upper { 87 Some(x) => x.checked_add(peek_len), 88 None => None, 89 }; 90 (lower, upper) 91 } 92 } 93 94 // Forwarding impl of Sink from the underlying stream 95 #[cfg(feature = "sink")] 96 impl<S, Item> Sink<Item> for Peekable<S> 97 where 98 S: Sink<Item> + Stream, 99 { 100 type Error = S::Error; 101 102 delegate_sink!(stream, Item); 103 } 104 105 pin_project! { 106 /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] 107 #[must_use = "futures do nothing unless polled"] 108 pub struct Peek<'a, St: Stream> { 109 inner: Option<Pin<&'a mut Peekable<St>>>, 110 } 111 } 112 113 impl<St> fmt::Debug for Peek<'_, St> 114 where 115 St: Stream + fmt::Debug, 116 St::Item: fmt::Debug, 117 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 119 f.debug_struct("Peek") 120 .field("inner", &self.inner) 121 .finish() 122 } 123 } 124 125 impl<St: Stream> FusedFuture for Peek<'_, St> { is_terminated(&self) -> bool126 fn is_terminated(&self) -> bool { 127 self.inner.is_none() 128 } 129 } 130 131 impl<'a, St> Future for Peek<'a, St> 132 where 133 St: Stream, 134 { 135 type Output = Option<&'a St::Item>; poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>136 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 137 let inner = self.project().inner; 138 if let Some(peekable) = inner { 139 ready!(peekable.as_mut().poll_peek(cx)); 140 141 inner.take().unwrap().poll_peek(cx) 142 } else { 143 panic!("Peek polled after completion") 144 } 145 } 146 } 147