1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream, TryStream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_project_lite::pin_project; 7 8 pin_project! { 9 /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. 10 #[derive(Debug)] 11 #[must_use = "streams do nothing unless polled"] 12 pub struct IntoStream<St> { 13 #[pin] 14 stream: St, 15 } 16 } 17 18 impl<St> IntoStream<St> { 19 #[inline] new(stream: St) -> Self20 pub(super) fn new(stream: St) -> Self { 21 Self { stream } 22 } 23 24 delegate_access_inner!(stream, St, ()); 25 } 26 27 impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> { is_terminated(&self) -> bool28 fn is_terminated(&self) -> bool { 29 self.stream.is_terminated() 30 } 31 } 32 33 impl<St: TryStream> Stream for IntoStream<St> { 34 type Item = Result<St::Ok, St::Error>; 35 36 #[inline] poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>37 fn poll_next( 38 self: Pin<&mut Self>, 39 cx: &mut Context<'_>, 40 ) -> Poll<Option<Self::Item>> { 41 self.project().stream.try_poll_next(cx) 42 } 43 size_hint(&self) -> (usize, Option<usize>)44 fn size_hint(&self) -> (usize, Option<usize>) { 45 self.stream.size_hint() 46 } 47 } 48 49 // Forwarding impl of Sink from the underlying stream 50 #[cfg(feature = "sink")] 51 impl<S: Sink<Item>, Item> Sink<Item> for IntoStream<S> { 52 type Error = S::Error; 53 54 delegate_sink!(stream, Item); 55 } 56