1 use crate::stream::{Fuse, StreamExt};
2 use core::fmt;
3 use core::pin::Pin;
4 use futures_core::future::{FusedFuture, Future};
5 use futures_core::ready;
6 use futures_core::stream::{FusedStream, Stream};
7 use futures_core::task::{Context, Poll};
8 #[cfg(feature = "sink")]
9 use futures_sink::Sink;
10 use pin_project_lite::pin_project;
11 
12 pin_project! {
13     /// A `Stream` that implements a `peek` method.
14     ///
15     /// The `peek` method can be used to retrieve a reference
16     /// to the next `Stream::Item` if available. A subsequent
17     /// call to `poll` will return the owned item.
18     #[derive(Debug)]
19     #[must_use = "streams do nothing unless polled"]
20     pub struct Peekable<St: Stream> {
21         #[pin]
22         stream: Fuse<St>,
23         peeked: Option<St::Item>,
24     }
25 }
26 
27 impl<St: Stream> Peekable<St> {
new(stream: St) -> Self28     pub(super) fn new(stream: St) -> Self {
29         Self {
30             stream: stream.fuse(),
31             peeked: None,
32         }
33     }
34 
35     delegate_access_inner!(stream, St, (.));
36 
37     /// Produces a `Peek` future which retrieves a reference to the next item
38     /// in the stream, or `None` if the underlying stream terminates.
peek(self: Pin<&mut Self>) -> Peek<'_, St>39     pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
40         Peek { inner: Some(self) }
41     }
42 
43     /// Peek retrieves a reference to the next item in the stream.
44     ///
45     /// This method polls the underlying stream and return either a reference
46     /// to the next item if the stream is ready or passes through any errors.
poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>>47     pub fn poll_peek(
48         self: Pin<&mut Self>,
49         cx: &mut Context<'_>,
50     ) -> Poll<Option<&St::Item>> {
51         let mut this = self.project();
52 
53         Poll::Ready(loop {
54             if this.peeked.is_some() {
55                 break this.peeked.as_ref();
56             } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
57                 *this.peeked = Some(item);
58             } else {
59                 break None;
60             }
61         })
62     }
63 }
64 
65 impl<St: Stream> FusedStream for Peekable<St> {
is_terminated(&self) -> bool66     fn is_terminated(&self) -> bool {
67         self.peeked.is_none() && self.stream.is_terminated()
68     }
69 }
70 
71 impl<S: Stream> Stream for Peekable<S> {
72     type Item = S::Item;
73 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>74     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75         let this = self.project();
76         if let Some(item) = this.peeked.take() {
77             return Poll::Ready(Some(item));
78         }
79         this.stream.poll_next(cx)
80     }
81 
size_hint(&self) -> (usize, Option<usize>)82     fn size_hint(&self) -> (usize, Option<usize>) {
83         let peek_len = if self.peeked.is_some() { 1 } else { 0 };
84         let (lower, upper) = self.stream.size_hint();
85         let lower = lower.saturating_add(peek_len);
86         let upper = match upper {
87             Some(x) => x.checked_add(peek_len),
88             None => None,
89         };
90         (lower, upper)
91     }
92 }
93 
94 // Forwarding impl of Sink from the underlying stream
95 #[cfg(feature = "sink")]
96 impl<S, Item> Sink<Item> for Peekable<S>
97 where
98     S: Sink<Item> + Stream,
99 {
100     type Error = S::Error;
101 
102     delegate_sink!(stream, Item);
103 }
104 
105 pin_project! {
106     /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
107     #[must_use = "futures do nothing unless polled"]
108     pub struct Peek<'a, St: Stream> {
109         inner: Option<Pin<&'a mut Peekable<St>>>,
110     }
111 }
112 
113 impl<St> fmt::Debug for Peek<'_, St>
114 where
115     St: Stream + fmt::Debug,
116     St::Item: fmt::Debug,
117 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result118     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119         f.debug_struct("Peek")
120             .field("inner", &self.inner)
121             .finish()
122     }
123 }
124 
125 impl<St: Stream> FusedFuture for Peek<'_, St> {
is_terminated(&self) -> bool126     fn is_terminated(&self) -> bool {
127         self.inner.is_none()
128     }
129 }
130 
131 impl<'a, St> Future for Peek<'a, St>
132 where
133     St: Stream,
134 {
135     type Output = Option<&'a St::Item>;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>136     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137         let inner = self.project().inner;
138         if let Some(peekable) = inner {
139             ready!(peekable.as_mut().poll_peek(cx));
140 
141             inner.take().unwrap().poll_peek(cx)
142         } else {
143             panic!("Peek polled after completion")
144         }
145     }
146 }
147