1 use core::pin::Pin;
2 use futures_core::stream::{FusedStream, Stream, TryStream};
3 use futures_core::task::{Context, Poll};
4 #[cfg(feature = "sink")]
5 use futures_sink::Sink;
6 use pin_project_lite::pin_project;
7 
8 pin_project! {
9     /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method.
10     #[derive(Debug)]
11     #[must_use = "streams do nothing unless polled"]
12     pub struct IntoStream<St> {
13         #[pin]
14         stream: St,
15     }
16 }
17 
18 impl<St> IntoStream<St> {
19     #[inline]
new(stream: St) -> Self20     pub(super) fn new(stream: St) -> Self {
21         Self { stream }
22     }
23 
24     delegate_access_inner!(stream, St, ());
25 }
26 
27 impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> {
is_terminated(&self) -> bool28     fn is_terminated(&self) -> bool {
29         self.stream.is_terminated()
30     }
31 }
32 
33 impl<St: TryStream> Stream for IntoStream<St> {
34     type Item = Result<St::Ok, St::Error>;
35 
36     #[inline]
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>37     fn poll_next(
38         self: Pin<&mut Self>,
39         cx: &mut Context<'_>,
40     ) -> Poll<Option<Self::Item>> {
41         self.project().stream.try_poll_next(cx)
42     }
43 
size_hint(&self) -> (usize, Option<usize>)44     fn size_hint(&self) -> (usize, Option<usize>) {
45         self.stream.size_hint()
46     }
47 }
48 
49 // Forwarding impl of Sink from the underlying stream
50 #[cfg(feature = "sink")]
51 impl<S: Sink<Item>, Item> Sink<Item> for IntoStream<S> {
52     type Error = S::Error;
53 
54     delegate_sink!(stream, Item);
55 }
56