1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Streams`s
4 //! that return `Result`s, allowing for short-circuiting computations.
5 
6 #[cfg(feature = "compat")]
7 use crate::compat::Compat;
8 use core::pin::Pin;
9 use futures_core::{
10     future::{Future, TryFuture},
11     stream::TryStream,
12     task::{Context, Poll},
13 };
14 use crate::fns::{
15     InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn,
16 };
17 use crate::future::assert_future;
18 use crate::stream::{Map, Inspect};
19 use crate::stream::assert_stream;
20 
21 mod and_then;
22 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
23 pub use self::and_then::AndThen;
24 
25 delegate_all!(
26     /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
27     ErrInto<St, E>(
28         MapErr<St, IntoFn<E>>
29     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
30 );
31 
32 delegate_all!(
33     /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
34     InspectOk<St, F>(
35         Inspect<IntoStream<St>, InspectOkFn<F>>
36     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
37 );
38 
39 delegate_all!(
40     /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
41     InspectErr<St, F>(
42         Inspect<IntoStream<St>, InspectErrFn<F>>
43     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
44 );
45 
46 mod into_stream;
47 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
48 pub use self::into_stream::IntoStream;
49 
50 delegate_all!(
51     /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
52     MapOk<St, F>(
53         Map<IntoStream<St>, MapOkFn<F>>
54     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
55 );
56 
57 delegate_all!(
58     /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
59     MapErr<St, F>(
60         Map<IntoStream<St>, MapErrFn<F>>
61     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
62 );
63 
64 mod or_else;
65 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
66 pub use self::or_else::OrElse;
67 
68 mod try_next;
69 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
70 pub use self::try_next::TryNext;
71 
72 mod try_for_each;
73 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
74 pub use self::try_for_each::TryForEach;
75 
76 mod try_filter;
77 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
78 pub use self::try_filter::TryFilter;
79 
80 mod try_filter_map;
81 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
82 pub use self::try_filter_map::TryFilterMap;
83 
84 mod try_flatten;
85 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
86 pub use self::try_flatten::TryFlatten;
87 
88 mod try_collect;
89 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
90 pub use self::try_collect::TryCollect;
91 
92 mod try_concat;
93 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
94 pub use self::try_concat::TryConcat;
95 
96 mod try_fold;
97 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
98 pub use self::try_fold::TryFold;
99 
100 mod try_unfold;
101 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
102 pub use self::try_unfold::{try_unfold, TryUnfold};
103 
104 mod try_skip_while;
105 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
106 pub use self::try_skip_while::TrySkipWhile;
107 
108 mod try_take_while;
109 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
110 pub use self::try_take_while::TryTakeWhile;
111 
112 cfg_target_has_atomic! {
113     #[cfg(feature = "alloc")]
114     mod try_buffer_unordered;
115     #[cfg(feature = "alloc")]
116     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
117     pub use self::try_buffer_unordered::TryBufferUnordered;
118 
119     #[cfg(feature = "alloc")]
120     mod try_buffered;
121     #[cfg(feature = "alloc")]
122     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
123     pub use self::try_buffered::TryBuffered;
124 
125     #[cfg(feature = "alloc")]
126     mod try_for_each_concurrent;
127     #[cfg(feature = "alloc")]
128     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
129     pub use self::try_for_each_concurrent::TryForEachConcurrent;
130 }
131 
132 #[cfg(feature = "io")]
133 #[cfg(feature = "std")]
134 mod into_async_read;
135 #[cfg(feature = "io")]
136 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
137 #[cfg(feature = "std")]
138 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
139 pub use self::into_async_read::IntoAsyncRead;
140 
141 impl<S: ?Sized + TryStream> TryStreamExt for S {}
142 
143 /// Adapters specific to `Result`-returning streams
144 pub trait TryStreamExt: TryStream {
145     /// Wraps the current stream in a new stream which converts the error type
146     /// into the one provided.
147     ///
148     /// # Examples
149     ///
150     /// ```
151     /// # futures::executor::block_on(async {
152     /// use futures::stream::{self, TryStreamExt};
153     ///
154     /// let mut stream =
155     ///     stream::iter(vec![Ok(()), Err(5i32)])
156     ///         .err_into::<i64>();
157     ///
158     /// assert_eq!(stream.try_next().await, Ok(Some(())));
159     /// assert_eq!(stream.try_next().await, Err(5i64));
160     /// # })
161     /// ```
err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, Self::Error: Into<E>,162     fn err_into<E>(self) -> ErrInto<Self, E>
163     where
164         Self: Sized,
165         Self::Error: Into<E>,
166     {
167         assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
168     }
169 
170     /// Wraps the current stream in a new stream which maps the success value
171     /// using the provided closure.
172     ///
173     /// # Examples
174     ///
175     /// ```
176     /// # futures::executor::block_on(async {
177     /// use futures::stream::{self, TryStreamExt};
178     ///
179     /// let mut stream =
180     ///     stream::iter(vec![Ok(5), Err(0)])
181     ///         .map_ok(|x| x + 2);
182     ///
183     /// assert_eq!(stream.try_next().await, Ok(Some(7)));
184     /// assert_eq!(stream.try_next().await, Err(0));
185     /// # })
186     /// ```
map_ok<T, F>(self, f: F) -> MapOk<Self, F> where Self: Sized, F: FnMut(Self::Ok) -> T,187     fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
188     where
189         Self: Sized,
190         F: FnMut(Self::Ok) -> T,
191     {
192         assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
193     }
194 
195     /// Wraps the current stream in a new stream which maps the error value
196     /// using the provided closure.
197     ///
198     /// # Examples
199     ///
200     /// ```
201     /// # futures::executor::block_on(async {
202     /// use futures::stream::{self, TryStreamExt};
203     ///
204     /// let mut stream =
205     ///     stream::iter(vec![Ok(5), Err(0)])
206     ///         .map_err(|x| x + 2);
207     ///
208     /// assert_eq!(stream.try_next().await, Ok(Some(5)));
209     /// assert_eq!(stream.try_next().await, Err(2));
210     /// # })
211     /// ```
map_err<E, F>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnMut(Self::Error) -> E,212     fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
213     where
214         Self: Sized,
215         F: FnMut(Self::Error) -> E,
216     {
217         assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
218     }
219 
220     /// Chain on a computation for when a value is ready, passing the successful
221     /// results to the provided closure `f`.
222     ///
223     /// This function can be used to run a unit of work when the next successful
224     /// value on a stream is ready. The closure provided will be yielded a value
225     /// when ready, and the returned future will then be run to completion to
226     /// produce the next value on this stream.
227     ///
228     /// Any errors produced by this stream will not be passed to the closure,
229     /// and will be passed through.
230     ///
231     /// The returned value of the closure must implement the `TryFuture` trait
232     /// and can represent some more work to be done before the composed stream
233     /// is finished.
234     ///
235     /// Note that this function consumes the receiving stream and returns a
236     /// wrapped version of it.
237     ///
238     /// To process the entire stream and return a single future representing
239     /// success or error, use `try_for_each` instead.
240     ///
241     /// # Examples
242     ///
243     /// ```
244     /// use futures::channel::mpsc;
245     /// use futures::future;
246     /// use futures::stream::TryStreamExt;
247     ///
248     /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
249     ///
250     /// let rx = rx.and_then(|result| {
251     ///     future::ok(if result % 2 == 0 {
252     ///         Some(result)
253     ///     } else {
254     ///         None
255     ///     })
256     /// });
257     /// ```
and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,258     fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
259     where
260         F: FnMut(Self::Ok) -> Fut,
261         Fut: TryFuture<Error = Self::Error>,
262         Self: Sized,
263     {
264         assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
265     }
266 
267     /// Chain on a computation for when an error happens, passing the
268     /// erroneous result to the provided closure `f`.
269     ///
270     /// This function can be used to run a unit of work and attempt to recover from
271     /// an error if one happens. The closure provided will be yielded an error
272     /// when one appears, and the returned future will then be run to completion
273     /// to produce the next value on this stream.
274     ///
275     /// Any successful values produced by this stream will not be passed to the
276     /// closure, and will be passed through.
277     ///
278     /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
279     /// and can represent some more work to be done before the composed stream
280     /// is finished.
281     ///
282     /// Note that this function consumes the receiving stream and returns a
283     /// wrapped version of it.
or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,284     fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
285     where
286         F: FnMut(Self::Error) -> Fut,
287         Fut: TryFuture<Ok = Self::Ok>,
288         Self: Sized,
289     {
290         assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
291     }
292 
293     /// Do something with the success value of this stream, afterwards passing
294     /// it on.
295     ///
296     /// This is similar to the `StreamExt::inspect` method where it allows
297     /// easily inspecting the success value as it passes through the stream, for
298     /// example to debug what's going on.
inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where F: FnMut(&Self::Ok), Self: Sized,299     fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
300     where
301         F: FnMut(&Self::Ok),
302         Self: Sized,
303     {
304         assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
305     }
306 
307     /// Do something with the error value of this stream, afterwards passing it on.
308     ///
309     /// This is similar to the `StreamExt::inspect` method where it allows
310     /// easily inspecting the error value as it passes through the stream, for
311     /// example to debug what's going on.
inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,312     fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
313     where
314         F: FnMut(&Self::Error),
315         Self: Sized,
316     {
317         assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
318     }
319 
320     /// Wraps a [`TryStream`] into a type that implements
321     /// [`Stream`](futures_core::stream::Stream)
322     ///
323     /// [`TryStream`]s currently do not implement the
324     /// [`Stream`](futures_core::stream::Stream) trait because of limitations
325     /// of the compiler.
326     ///
327     /// # Examples
328     ///
329     /// ```
330     /// use futures::stream::{Stream, TryStream, TryStreamExt};
331     ///
332     /// # type T = i32;
333     /// # type E = ();
334     /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
335     /// # futures::stream::empty()
336     /// # }
337     /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
338     ///
339     /// take_stream(make_try_stream().into_stream());
340     /// ```
into_stream(self) -> IntoStream<Self> where Self: Sized,341     fn into_stream(self) -> IntoStream<Self>
342     where
343         Self: Sized,
344     {
345         assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
346     }
347 
348     /// Creates a future that attempts to resolve the next item in the stream.
349     /// If an error is encountered before the next item, the error is returned
350     /// instead.
351     ///
352     /// This is similar to the `Stream::next` combinator, but returns a
353     /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
354     /// for easy use with the `?` operator.
355     ///
356     /// # Examples
357     ///
358     /// ```
359     /// # futures::executor::block_on(async {
360     /// use futures::stream::{self, TryStreamExt};
361     ///
362     /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
363     ///
364     /// assert_eq!(stream.try_next().await, Ok(Some(())));
365     /// assert_eq!(stream.try_next().await, Err(()));
366     /// # })
367     /// ```
try_next(&mut self) -> TryNext<'_, Self> where Self: Unpin,368     fn try_next(&mut self) -> TryNext<'_, Self>
369     where
370         Self: Unpin,
371     {
372         assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
373     }
374 
375     /// Attempts to run this stream to completion, executing the provided
376     /// asynchronous closure for each element on the stream.
377     ///
378     /// The provided closure will be called for each item this stream produces,
379     /// yielding a future. That future will then be executed to completion
380     /// before moving on to the next item.
381     ///
382     /// The returned value is a [`Future`](futures_core::future::Future) where the
383     /// [`Output`](futures_core::future::Future::Output) type is
384     /// `Result<(), Self::Error>`. If any of the intermediate
385     /// futures or the stream returns an error, this future will return
386     /// immediately with an error.
387     ///
388     /// # Examples
389     ///
390     /// ```
391     /// # futures::executor::block_on(async {
392     /// use futures::future;
393     /// use futures::stream::{self, TryStreamExt};
394     ///
395     /// let mut x = 0i32;
396     ///
397     /// {
398     ///     let fut = stream::repeat(Ok(1)).try_for_each(|item| {
399     ///         x += item;
400     ///         future::ready(if x == 3 { Err(()) } else { Ok(()) })
401     ///     });
402     ///     assert_eq!(fut.await, Err(()));
403     /// }
404     ///
405     /// assert_eq!(x, 3);
406     /// # })
407     /// ```
try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,408     fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
409     where
410         F: FnMut(Self::Ok) -> Fut,
411         Fut: TryFuture<Ok = (), Error = Self::Error>,
412         Self: Sized,
413     {
414         assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
415     }
416 
417     /// Skip elements on this stream while the provided asynchronous predicate
418     /// resolves to `true`.
419     ///
420     /// This function is similar to
421     /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
422     /// early if an error occurs.
423     ///
424     /// # Examples
425     ///
426     /// ```
427     /// # futures::executor::block_on(async {
428     /// use futures::future;
429     /// use futures::stream::{self, TryStreamExt};
430     ///
431     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
432     /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
433     ///
434     /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
435     /// assert_eq!(output, Ok(vec![3, 2]));
436     /// # })
437     /// ```
try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,438     fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
439     where
440         F: FnMut(&Self::Ok) -> Fut,
441         Fut: TryFuture<Ok = bool, Error = Self::Error>,
442         Self: Sized,
443     {
444         assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
445     }
446 
447     /// Take elements on this stream while the provided asynchronous predicate
448     /// resolves to `true`.
449     ///
450     /// This function is similar to
451     /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
452     /// early if an error occurs.
453     ///
454     /// # Examples
455     ///
456     /// ```
457     /// # futures::executor::block_on(async {
458     /// use futures::future;
459     /// use futures::stream::{self, TryStreamExt};
460     ///
461     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
462     /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
463     ///
464     /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
465     /// assert_eq!(output, Ok(vec![1, 2]));
466     /// # })
467     /// ```
try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,468     fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
469     where
470         F: FnMut(&Self::Ok) -> Fut,
471         Fut: TryFuture<Ok = bool, Error = Self::Error>,
472         Self: Sized,
473     {
474         assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
475     }
476 
477     /// Attempts to run this stream to completion, executing the provided asynchronous
478     /// closure for each element on the stream concurrently as elements become
479     /// available, exiting as soon as an error occurs.
480     ///
481     /// This is similar to
482     /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
483     /// but will resolve to an error immediately if the underlying stream or the provided
484     /// closure return an error.
485     ///
486     /// This method is only available when the `std` or `alloc` feature of this
487     /// library is activated, and it is activated by default.
488     ///
489     /// # Examples
490     ///
491     /// ```
492     /// # futures::executor::block_on(async {
493     /// use futures::channel::oneshot;
494     /// use futures::stream::{self, StreamExt, TryStreamExt};
495     ///
496     /// let (tx1, rx1) = oneshot::channel();
497     /// let (tx2, rx2) = oneshot::channel();
498     /// let (_tx3, rx3) = oneshot::channel();
499     ///
500     /// let stream = stream::iter(vec![rx1, rx2, rx3]);
501     /// let fut = stream.map(Ok).try_for_each_concurrent(
502     ///     /* limit */ 2,
503     ///     |rx| async move {
504     ///         let res: Result<(), oneshot::Canceled> = rx.await;
505     ///         res
506     ///     }
507     /// );
508     ///
509     /// tx1.send(()).unwrap();
510     /// // Drop the second sender so that `rx2` resolves to `Canceled`.
511     /// drop(tx2);
512     ///
513     /// // The final result is an error because the second future
514     /// // resulted in an error.
515     /// assert_eq!(Err(oneshot::Canceled), fut.await);
516     /// # })
517     /// ```
518     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
519     #[cfg(feature = "alloc")]
try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = Result<(), Self::Error>>, Self: Sized,520     fn try_for_each_concurrent<Fut, F>(
521         self,
522         limit: impl Into<Option<usize>>,
523         f: F,
524     ) -> TryForEachConcurrent<Self, Fut, F>
525     where
526         F: FnMut(Self::Ok) -> Fut,
527         Fut: Future<Output = Result<(), Self::Error>>,
528         Self: Sized,
529     {
530         assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
531             self,
532             limit.into(),
533             f,
534         ))
535     }
536 
537     /// Attempt to transform a stream into a collection,
538     /// returning a future representing the result of that computation.
539     ///
540     /// This combinator will collect all successful results of this stream and
541     /// collect them into the specified collection type. If an error happens then all
542     /// collected elements will be dropped and the error will be returned.
543     ///
544     /// The returned future will be resolved when the stream terminates.
545     ///
546     /// # Examples
547     ///
548     /// ```
549     /// # futures::executor::block_on(async {
550     /// use futures::channel::mpsc;
551     /// use futures::stream::TryStreamExt;
552     /// use std::thread;
553     ///
554     /// let (tx, rx) = mpsc::unbounded();
555     ///
556     /// thread::spawn(move || {
557     ///     for i in 1..=5 {
558     ///         tx.unbounded_send(Ok(i)).unwrap();
559     ///     }
560     ///     tx.unbounded_send(Err(6)).unwrap();
561     /// });
562     ///
563     /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
564     /// assert_eq!(output, Err(6));
565     /// # })
566     /// ```
try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> where Self: Sized,567     fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
568     where
569         Self: Sized,
570     {
571         assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
572     }
573 
574     /// Attempt to filter the values produced by this stream according to the
575     /// provided asynchronous closure.
576     ///
577     /// As values of this stream are made available, the provided predicate `f`
578     /// will be run on them. If the predicate returns a `Future` which resolves
579     /// to `true`, then the stream will yield the value, but if the predicate
580     /// return a `Future` which resolves to `false`, then the value will be
581     /// discarded and the next value will be produced.
582     ///
583     /// All errors are passed through without filtering in this combinator.
584     ///
585     /// Note that this function consumes the stream passed into it and returns a
586     /// wrapped version of it, similar to the existing `filter` methods in
587     /// the standard library.
588     ///
589     /// # Examples
590     /// ```
591     /// # futures::executor::block_on(async {
592     /// use futures::future;
593     /// use futures::stream::{self, StreamExt, TryStreamExt};
594     ///
595     /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
596     /// let mut evens = stream.try_filter(|x| {
597     ///     future::ready(x % 2 == 0)
598     /// });
599     ///
600     /// assert_eq!(evens.next().await, Some(Ok(2)));
601     /// assert_eq!(evens.next().await, Some(Err("error")));
602     /// # })
603     /// ```
try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where Fut: Future<Output = bool>, F: FnMut(&Self::Ok) -> Fut, Self: Sized,604     fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
605     where
606         Fut: Future<Output = bool>,
607         F: FnMut(&Self::Ok) -> Fut,
608         Self: Sized,
609     {
610         assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
611     }
612 
613     /// Attempt to filter the values produced by this stream while
614     /// simultaneously mapping them to a different type according to the
615     /// provided asynchronous closure.
616     ///
617     /// As values of this stream are made available, the provided function will
618     /// be run on them. If the future returned by the predicate `f` resolves to
619     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
620     /// it resolves to [`None`] then the next value will be produced.
621     ///
622     /// All errors are passed through without filtering in this combinator.
623     ///
624     /// Note that this function consumes the stream passed into it and returns a
625     /// wrapped version of it, similar to the existing `filter_map` methods in
626     /// the standard library.
627     ///
628     /// # Examples
629     /// ```
630     /// # futures::executor::block_on(async {
631     /// use futures::stream::{self, StreamExt, TryStreamExt};
632     /// use futures::pin_mut;
633     ///
634     /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
635     /// let halves = stream.try_filter_map(|x| async move {
636     ///     let ret = if x % 2 == 0 { Some(x / 2) } else { None };
637     ///     Ok(ret)
638     /// });
639     ///
640     /// pin_mut!(halves);
641     /// assert_eq!(halves.next().await, Some(Ok(3)));
642     /// assert_eq!(halves.next().await, Some(Err("error")));
643     /// # })
644     /// ```
try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, Self: Sized,645     fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
646     where
647         Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
648         F: FnMut(Self::Ok) -> Fut,
649         Self: Sized,
650     {
651         assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
652     }
653 
654     /// Flattens a stream of streams into just one continuous stream.
655     ///
656     /// If this stream's elements are themselves streams then this combinator
657     /// will flatten out the entire stream to one long chain of elements. Any
658     /// errors are passed through without looking at them, but otherwise each
659     /// individual stream will get exhausted before moving on to the next.
660     ///
661     /// # Examples
662     ///
663     /// ```
664     /// # futures::executor::block_on(async {
665     /// use futures::channel::mpsc;
666     /// use futures::stream::{StreamExt, TryStreamExt};
667     /// use std::thread;
668     ///
669     /// let (tx1, rx1) = mpsc::unbounded();
670     /// let (tx2, rx2) = mpsc::unbounded();
671     /// let (tx3, rx3) = mpsc::unbounded();
672     ///
673     /// thread::spawn(move || {
674     ///     tx1.unbounded_send(Ok(1)).unwrap();
675     /// });
676     /// thread::spawn(move || {
677     ///     tx2.unbounded_send(Ok(2)).unwrap();
678     ///     tx2.unbounded_send(Err(3)).unwrap();
679     /// });
680     /// thread::spawn(move || {
681     ///     tx3.unbounded_send(Ok(rx1)).unwrap();
682     ///     tx3.unbounded_send(Ok(rx2)).unwrap();
683     ///     tx3.unbounded_send(Err(4)).unwrap();
684     /// });
685     ///
686     /// let mut stream = rx3.try_flatten();
687     /// assert_eq!(stream.next().await, Some(Ok(1)));
688     /// assert_eq!(stream.next().await, Some(Ok(2)));
689     /// assert_eq!(stream.next().await, Some(Err(3)));
690     /// # });
691     /// ```
try_flatten(self) -> TryFlatten<Self> where Self::Ok: TryStream, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,692     fn try_flatten(self) -> TryFlatten<Self>
693     where
694         Self::Ok: TryStream,
695         <Self::Ok as TryStream>::Error: From<Self::Error>,
696         Self: Sized,
697     {
698         assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
699             TryFlatten::new(self),
700         )
701     }
702 
703     /// Attempt to execute an accumulating asynchronous computation over a
704     /// stream, collecting all the values into one final result.
705     ///
706     /// This combinator will accumulate all values returned by this stream
707     /// according to the closure provided. The initial state is also provided to
708     /// this method and then is returned again by each execution of the closure.
709     /// Once the entire stream has been exhausted the returned future will
710     /// resolve to this value.
711     ///
712     /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
713     /// exit early if an error is encountered in either the stream or the
714     /// provided closure.
715     ///
716     /// # Examples
717     ///
718     /// ```
719     /// # futures::executor::block_on(async {
720     /// use futures::stream::{self, TryStreamExt};
721     ///
722     /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
723     /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
724     /// assert_eq!(sum.await, Ok(3));
725     ///
726     /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
727     /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
728     /// assert_eq!(sum.await, Err(2));
729     /// # })
730     /// ```
try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where F: FnMut(T, Self::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized,731     fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
732     where
733         F: FnMut(T, Self::Ok) -> Fut,
734         Fut: TryFuture<Ok = T, Error = Self::Error>,
735         Self: Sized,
736     {
737         assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
738     }
739 
740     /// Attempt to concatenate all items of a stream into a single
741     /// extendable destination, returning a future representing the end result.
742     ///
743     /// This combinator will extend the first item with the contents of all
744     /// the subsequent successful results of the stream. If the stream is empty,
745     /// the default value will be returned.
746     ///
747     /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
748     ///
749     /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
750     /// exit early if an error is encountered in the stream.
751     ///
752     /// # Examples
753     ///
754     /// ```
755     /// # futures::executor::block_on(async {
756     /// use futures::channel::mpsc;
757     /// use futures::stream::TryStreamExt;
758     /// use std::thread;
759     ///
760     /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
761     ///
762     /// thread::spawn(move || {
763     ///     for i in (0..3).rev() {
764     ///         let n = i * 3;
765     ///         tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
766     ///     }
767     /// });
768     ///
769     /// let result = rx.try_concat().await;
770     ///
771     /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
772     /// # });
773     /// ```
try_concat(self) -> TryConcat<Self> where Self: Sized, Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,774     fn try_concat(self) -> TryConcat<Self>
775     where
776         Self: Sized,
777         Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
778     {
779         assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
780     }
781 
782     /// Attempt to execute several futures from a stream concurrently (unordered).
783     ///
784     /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
785     /// that matches the stream's `Error` type.
786     ///
787     /// This adaptor will buffer up to `n` futures and then return their
788     /// outputs in the order in which they complete. If the underlying stream
789     /// returns an error, it will be immediately propagated.
790     ///
791     /// The returned stream will be a stream of results, each containing either
792     /// an error or a future's output. An error can be produced either by the
793     /// underlying stream itself or by one of the futures it yielded.
794     ///
795     /// This method is only available when the `std` or `alloc` feature of this
796     /// library is activated, and it is activated by default.
797     ///
798     /// # Examples
799     ///
800     /// Results are returned in the order of completion:
801     /// ```
802     /// # futures::executor::block_on(async {
803     /// use futures::channel::oneshot;
804     /// use futures::stream::{self, StreamExt, TryStreamExt};
805     ///
806     /// let (send_one, recv_one) = oneshot::channel();
807     /// let (send_two, recv_two) = oneshot::channel();
808     ///
809     /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
810     ///
811     /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
812     ///
813     /// send_two.send(2i32)?;
814     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
815     ///
816     /// send_one.send(1i32)?;
817     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
818     ///
819     /// assert_eq!(buffered.next().await, None);
820     /// # Ok::<(), i32>(()) }).unwrap();
821     /// ```
822     ///
823     /// Errors from the underlying stream itself are propagated:
824     /// ```
825     /// # futures::executor::block_on(async {
826     /// use futures::channel::mpsc;
827     /// use futures::stream::{StreamExt, TryStreamExt};
828     ///
829     /// let (sink, stream_of_futures) = mpsc::unbounded();
830     /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
831     ///
832     /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
833     /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
834     ///
835     /// sink.unbounded_send(Err("error in the stream"))?;
836     /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
837     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
838     /// ```
839     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
840     #[cfg(feature = "alloc")]
try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,841     fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
842     where
843         Self::Ok: TryFuture<Error = Self::Error>,
844         Self: Sized,
845     {
846         assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
847             TryBufferUnordered::new(self, n),
848         )
849     }
850 
851     /// Attempt to execute several futures from a stream concurrently.
852     ///
853     /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
854     /// that matches the stream's `Error` type.
855     ///
856     /// This adaptor will buffer up to `n` futures and then return their
857     /// outputs in the order. If the underlying stream returns an error, it will
858     /// be immediately propagated.
859     ///
860     /// The returned stream will be a stream of results, each containing either
861     /// an error or a future's output. An error can be produced either by the
862     /// underlying stream itself or by one of the futures it yielded.
863     ///
864     /// This method is only available when the `std` or `alloc` feature of this
865     /// library is activated, and it is activated by default.
866     ///
867     /// # Examples
868     ///
869     /// Results are returned in the order of addition:
870     /// ```
871     /// # futures::executor::block_on(async {
872     /// use futures::channel::oneshot;
873     /// use futures::future::lazy;
874     /// use futures::stream::{self, StreamExt, TryStreamExt};
875     ///
876     /// let (send_one, recv_one) = oneshot::channel();
877     /// let (send_two, recv_two) = oneshot::channel();
878     ///
879     /// let mut buffered = lazy(move |cx| {
880     ///     let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
881     ///
882     ///     let mut buffered = stream_of_futures.try_buffered(10);
883     ///
884     ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
885     ///
886     ///     send_two.send(2i32)?;
887     ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
888     ///     Ok::<_, i32>(buffered)
889     /// }).await?;
890     ///
891     /// send_one.send(1i32)?;
892     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
893     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
894     ///
895     /// assert_eq!(buffered.next().await, None);
896     /// # Ok::<(), i32>(()) }).unwrap();
897     /// ```
898     ///
899     /// Errors from the underlying stream itself are propagated:
900     /// ```
901     /// # futures::executor::block_on(async {
902     /// use futures::channel::mpsc;
903     /// use futures::stream::{StreamExt, TryStreamExt};
904     ///
905     /// let (sink, stream_of_futures) = mpsc::unbounded();
906     /// let mut buffered = stream_of_futures.try_buffered(10);
907     ///
908     /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
909     /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
910     ///
911     /// sink.unbounded_send(Err("error in the stream"))?;
912     /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
913     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
914     /// ```
915     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
916     #[cfg(feature = "alloc")]
try_buffered(self, n: usize) -> TryBuffered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,917     fn try_buffered(self, n: usize) -> TryBuffered<Self>
918     where
919         Self::Ok: TryFuture<Error = Self::Error>,
920         Self: Sized,
921     {
922         assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(self, n))
923     }
924 
925     // TODO: false positive warning from rustdoc. Verify once #43466 settles
926     //
927     /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
928     /// stream types.
try_poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>> where Self: Unpin,929     fn try_poll_next_unpin(
930         &mut self,
931         cx: &mut Context<'_>,
932     ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
933     where
934         Self: Unpin,
935     {
936         Pin::new(self).try_poll_next(cx)
937     }
938 
939     /// Wraps a [`TryStream`] into a stream compatible with libraries using
940     /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
941     /// ```
942     /// use futures::future::{FutureExt, TryFutureExt};
943     /// # let (tx, rx) = futures::channel::oneshot::channel();
944     ///
945     /// let future03 = async {
946     ///     println!("Running on the pool");
947     ///     tx.send(42).unwrap();
948     /// };
949     ///
950     /// let future01 = future03
951     ///     .unit_error() // Make it a TryFuture
952     ///     .boxed()  // Make it Unpin
953     ///     .compat();
954     ///
955     /// tokio::run(future01);
956     /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
957     /// ```
958     #[cfg(feature = "compat")]
959     #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
compat(self) -> Compat<Self> where Self: Sized + Unpin,960     fn compat(self) -> Compat<Self>
961     where
962         Self: Sized + Unpin,
963     {
964         Compat::new(self)
965     }
966 
967     /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
968     ///
969     /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
970     /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
971     /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
972     /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
973     ///
974     /// This method is only available when the `std` feature of this
975     /// library is activated, and it is activated by default.
976     ///
977     /// # Examples
978     ///
979     /// ```
980     /// # futures::executor::block_on(async {
981     /// use futures::stream::{self, TryStreamExt};
982     /// use futures::io::AsyncReadExt;
983     ///
984     /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
985     /// let mut reader = stream.into_async_read();
986     /// let mut buf = Vec::new();
987     ///
988     /// assert!(reader.read_to_end(&mut buf).await.is_ok());
989     /// assert_eq!(buf, &[1, 2, 3, 4, 5]);
990     /// # })
991     /// ```
992     #[cfg(feature = "io")]
993     #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
994     #[cfg(feature = "std")]
into_async_read(self) -> IntoAsyncRead<Self> where Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, Self::Ok: AsRef<[u8]>,995     fn into_async_read(self) -> IntoAsyncRead<Self>
996     where
997         Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
998         Self::Ok: AsRef<[u8]>,
999     {
1000         crate::io::assert_read(IntoAsyncRead::new(self))
1001     }
1002 }
1003