• Home
  • History
  • Annotate
Name Date Size #Lines LOC

..--

examples/23-Nov-2023-2218

src/23-Nov-2023-369166

tests/23-Nov-2023-418346

.cargo_vcs_info.jsonD23-Nov-202374 65

Android.bpD23-Nov-20231 KiB3934

Cargo.lockD23-Nov-202313.7 KiB553492

Cargo.tomlD23-Nov-20231.2 KiB4135

Cargo.toml.origD23-Nov-2023824 3128

LICENSED23-Nov-20232.1 KiB5244

METADATAD23-Nov-2023404 2019

MODULE_LICENSE_MITD23-Nov-20230

OWNERSD23-Nov-202340 21

README.mdD23-Nov-20234.5 KiB174133

README.tplD23-Nov-2023325 148

README.md

1# Asynchronous streams for Rust
2
3Asynchronous stream of elements.
4
5Provides two macros, `stream!` and `try_stream!`, allowing the caller to
6define asynchronous streams of elements. These are implemented using `async`
7& `await` notation. The `stream!` macro works without unstable features.
8
9The `stream!` macro returns an anonymous type implementing the [`Stream`]
10trait. The `Item` associated type is the type of the values yielded from the
11stream. The `try_stream!` also returns an anonymous type implementing the
12[`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The
13`try_stream!` macro supports using `?` notiation as part of the
14implementation.
15
16## Usage
17
18A basic stream yielding numbers. Values are yielded using the `yield`
19keyword. The stream block must return `()`.
20
21```rust
22use async_stream::stream;
23
24use futures_util::pin_mut;
25use futures_util::stream::StreamExt;
26
27#[tokio::main]
28async fn main() {
29    let s = stream! {
30        for i in 0..3 {
31            yield i;
32        }
33    };
34
35    pin_mut!(s); // needed for iteration
36
37    while let Some(value) = s.next().await {
38        println!("got {}", value);
39    }
40}
41```
42
43Streams may be returned by using `impl Stream<Item = T>`:
44
45```rust
46use async_stream::stream;
47
48use futures_core::stream::Stream;
49use futures_util::pin_mut;
50use futures_util::stream::StreamExt;
51
52fn zero_to_three() -> impl Stream<Item = u32> {
53    stream! {
54        for i in 0..3 {
55            yield i;
56        }
57    }
58}
59
60#[tokio::main]
61async fn main() {
62    let s = zero_to_three();
63    pin_mut!(s); // needed for iteration
64
65    while let Some(value) = s.next().await {
66        println!("got {}", value);
67    }
68}
69```
70
71Streams may be implemented in terms of other streams:
72
73```rust
74use async_stream::stream;
75
76use futures_core::stream::Stream;
77use futures_util::pin_mut;
78use futures_util::stream::StreamExt;
79
80fn zero_to_three() -> impl Stream<Item = u32> {
81    stream! {
82        for i in 0..3 {
83            yield i;
84        }
85    }
86}
87
88fn double<S: Stream<Item = u32>>(input: S)
89    -> impl Stream<Item = u32>
90{
91    stream! {
92        pin_mut!(input);
93        while let Some(value) = input.next().await {
94            yield value * 2;
95        }
96    }
97}
98
99#[tokio::main]
100async fn main() {
101    let s = double(zero_to_three());
102    pin_mut!(s); // needed for iteration
103
104    while let Some(value) = s.next().await {
105        println!("got {}", value);
106    }
107}
108```
109
110Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item`
111of the returned stream is `Result` with `Ok` being the value yielded and
112`Err` the error type returned by `?`.
113
114```rust
115use tokio::net::{TcpListener, TcpStream};
116
117use async_stream::try_stream;
118use futures_core::stream::Stream;
119
120use std::io;
121use std::net::SocketAddr;
122
123fn bind_and_accept(addr: SocketAddr)
124    -> impl Stream<Item = io::Result<TcpStream>>
125{
126    try_stream! {
127        let mut listener = TcpListener::bind(&addr)?;
128
129        loop {
130            let (stream, addr) = listener.accept().await?;
131            println!("received on {:?}", addr);
132            yield stream;
133        }
134    }
135}
136```
137
138## Implementation
139
140The `stream!` and `try_stream!` macros are implemented using proc macros.
141Given that proc macros in expression position are not supported on stable
142rust, a hack similar to the one provided by the [`proc-macro-hack`] crate is
143used. The macro searches the syntax tree for instances of `sender.send($expr)` and
144transforms them into `sender.send($expr).await`.
145
146The stream uses a lightweight sender to send values from the stream
147implementation to the caller. When entering the stream, an `Option<T>` is
148stored on the stack. A pointer to the cell is stored in a thread local and
149`poll` is called on the async block. When `poll` returns.
150`sender.send(value)` stores the value that cell and yields back to the
151caller.
152
153## Limitations
154
155`async-stream` suffers from the same limitations as the [`proc-macro-hack`]
156crate. Primarily, nesting support must be implemented using a `TT-muncher`.
157If large `stream!` blocks are used, the caller will be required to add
158`#![recursion_limit = "..."]` to their crate.
159
160A `stream!` macro may only contain up to 64 macro invocations.
161
162[`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
163[`proc-macro-hack`]: https://github.com/dtolnay/proc-macro-hack/
164
165## License
166
167This project is licensed under the [MIT license](LICENSE).
168
169### Contribution
170
171Unless you explicitly state otherwise, any contribution intentionally submitted
172for inclusion in `async-stream` by you, shall be licensed as MIT, without any
173additional terms or conditions.
174

README.tpl

1# Asynchronous streams for Rust
2
3{{readme}}
4
5## License
6
7This project is licensed under the [MIT license](LICENSE).
8
9### Contribution
10
11Unless you explicitly state otherwise, any contribution intentionally submitted
12for inclusion in `async-stream` by you, shall be licensed as MIT, without any
13additional terms or conditions.
14