• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))]
2 
3 //! Synchronization primitives for use in asynchronous contexts.
4 //!
5 //! Tokio programs tend to be organized as a set of [tasks] where each task
6 //! operates independently and may be executed on separate physical threads. The
7 //! synchronization primitives provided in this module permit these independent
8 //! tasks to communicate together.
9 //!
10 //! [tasks]: crate::task
11 //!
12 //! # Message passing
13 //!
14 //! The most common form of synchronization in a Tokio program is message
15 //! passing. Two tasks operate independently and send messages to each other to
16 //! synchronize. Doing so has the advantage of avoiding shared state.
17 //!
18 //! Message passing is implemented using channels. A channel supports sending a
19 //! message from one producer task to one or more consumer tasks. There are a
20 //! few flavors of channels provided by Tokio. Each channel flavor supports
21 //! different message passing patterns. When a channel supports multiple
22 //! producers, many separate tasks may **send** messages. When a channel
23 //! supports multiple consumers, many different separate tasks may **receive**
24 //! messages.
25 //!
26 //! Tokio provides many different channel flavors as different message passing
27 //! patterns are best handled with different implementations.
28 //!
29 //! ## `oneshot` channel
30 //!
31 //! The [`oneshot` channel][oneshot] supports sending a **single** value from a
32 //! single producer to a single consumer. This channel is usually used to send
33 //! the result of a computation to a waiter.
34 //!
35 //! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a
36 //! computation.
37 //!
38 //! ```
39 //! use tokio::sync::oneshot;
40 //!
41 //! async fn some_computation() -> String {
42 //!     "represents the result of the computation".to_string()
43 //! }
44 //!
45 //! #[tokio::main]
46 //! async fn main() {
47 //!     let (tx, rx) = oneshot::channel();
48 //!
49 //!     tokio::spawn(async move {
50 //!         let res = some_computation().await;
51 //!         tx.send(res).unwrap();
52 //!     });
53 //!
54 //!     // Do other work while the computation is happening in the background
55 //!
56 //!     // Wait for the computation result
57 //!     let res = rx.await.unwrap();
58 //! }
59 //! ```
60 //!
61 //! Note, if the task produces a computation result as its final
62 //! action before terminating, the [`JoinHandle`] can be used to
63 //! receive that value instead of allocating resources for the
64 //! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If
65 //! the task panics, the `Joinhandle` yields `Err` with the panic
66 //! cause.
67 //!
68 //! **Example:**
69 //!
70 //! ```
71 //! async fn some_computation() -> String {
72 //!     "the result of the computation".to_string()
73 //! }
74 //!
75 //! #[tokio::main]
76 //! async fn main() {
77 //!     let join_handle = tokio::spawn(async move {
78 //!         some_computation().await
79 //!     });
80 //!
81 //!     // Do other work while the computation is happening in the background
82 //!
83 //!     // Wait for the computation result
84 //!     let res = join_handle.await.unwrap();
85 //! }
86 //! ```
87 //!
88 //! [oneshot]: oneshot
89 //! [`JoinHandle`]: crate::task::JoinHandle
90 //!
91 //! ## `mpsc` channel
92 //!
93 //! The [`mpsc` channel][mpsc] supports sending **many** values from **many**
94 //! producers to a single consumer. This channel is often used to send work to a
95 //! task or to receive the result of many computations.
96 //!
97 //! **Example:** using an mpsc to incrementally stream the results of a series
98 //! of computations.
99 //!
100 //! ```
101 //! use tokio::sync::mpsc;
102 //!
103 //! async fn some_computation(input: u32) -> String {
104 //!     format!("the result of computation {}", input)
105 //! }
106 //!
107 //! #[tokio::main]
108 //! async fn main() {
109 //!     let (tx, mut rx) = mpsc::channel(100);
110 //!
111 //!     tokio::spawn(async move {
112 //!         for i in 0..10 {
113 //!             let res = some_computation(i).await;
114 //!             tx.send(res).await.unwrap();
115 //!         }
116 //!     });
117 //!
118 //!     while let Some(res) = rx.recv().await {
119 //!         println!("got = {}", res);
120 //!     }
121 //! }
122 //! ```
123 //!
124 //! The argument to `mpsc::channel` is the channel capacity. This is the maximum
125 //! number of values that can be stored in the channel pending receipt at any
126 //! given time. Properly setting this value is key in implementing robust
127 //! programs as the channel capacity plays a critical part in handling back
128 //! pressure.
129 //!
130 //! A common concurrency pattern for resource management is to spawn a task
131 //! dedicated to managing that resource and using message passing between other
132 //! tasks to interact with the resource. The resource may be anything that may
133 //! not be concurrently used. Some examples include a socket and program state.
134 //! For example, if multiple tasks need to send data over a single socket, spawn
135 //! a task to manage the socket and use a channel to synchronize.
136 //!
137 //! **Example:** sending data from many tasks over a single socket using message
138 //! passing.
139 //!
140 //! ```no_run
141 //! use tokio::io::{self, AsyncWriteExt};
142 //! use tokio::net::TcpStream;
143 //! use tokio::sync::mpsc;
144 //!
145 //! #[tokio::main]
146 //! async fn main() -> io::Result<()> {
147 //!     let mut socket = TcpStream::connect("www.example.com:1234").await?;
148 //!     let (tx, mut rx) = mpsc::channel(100);
149 //!
150 //!     for _ in 0..10 {
151 //!         // Each task needs its own `tx` handle. This is done by cloning the
152 //!         // original handle.
153 //!         let tx = tx.clone();
154 //!
155 //!         tokio::spawn(async move {
156 //!             tx.send(&b"data to write"[..]).await.unwrap();
157 //!         });
158 //!     }
159 //!
160 //!     // The `rx` half of the channel returns `None` once **all** `tx` clones
161 //!     // drop. To ensure `None` is returned, drop the handle owned by the
162 //!     // current task. If this `tx` handle is not dropped, there will always
163 //!     // be a single outstanding `tx` handle.
164 //!     drop(tx);
165 //!
166 //!     while let Some(res) = rx.recv().await {
167 //!         socket.write_all(res).await?;
168 //!     }
169 //!
170 //!     Ok(())
171 //! }
172 //! ```
173 //!
174 //! The [`mpsc`][mpsc] and [`oneshot`][oneshot] channels can be combined to
175 //! provide a request / response type synchronization pattern with a shared
176 //! resource. A task is spawned to synchronize a resource and waits on commands
177 //! received on a [`mpsc`][mpsc] channel. Each command includes a
178 //! [`oneshot`][oneshot] `Sender` on which the result of the command is sent.
179 //!
180 //! **Example:** use a task to synchronize a `u64` counter. Each task sends an
181 //! "fetch and increment" command. The counter value **before** the increment is
182 //! sent over the provided `oneshot` channel.
183 //!
184 //! ```
185 //! use tokio::sync::{oneshot, mpsc};
186 //! use Command::Increment;
187 //!
188 //! enum Command {
189 //!     Increment,
190 //!     // Other commands can be added here
191 //! }
192 //!
193 //! #[tokio::main]
194 //! async fn main() {
195 //!     let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);
196 //!
197 //!     // Spawn a task to manage the counter
198 //!     tokio::spawn(async move {
199 //!         let mut counter: u64 = 0;
200 //!
201 //!         while let Some((cmd, response)) = cmd_rx.recv().await {
202 //!             match cmd {
203 //!                 Increment => {
204 //!                     let prev = counter;
205 //!                     counter += 1;
206 //!                     response.send(prev).unwrap();
207 //!                 }
208 //!             }
209 //!         }
210 //!     });
211 //!
212 //!     let mut join_handles = vec![];
213 //!
214 //!     // Spawn tasks that will send the increment command.
215 //!     for _ in 0..10 {
216 //!         let cmd_tx = cmd_tx.clone();
217 //!
218 //!         join_handles.push(tokio::spawn(async move {
219 //!             let (resp_tx, resp_rx) = oneshot::channel();
220 //!
221 //!             cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
222 //!             let res = resp_rx.await.unwrap();
223 //!
224 //!             println!("previous value = {}", res);
225 //!         }));
226 //!     }
227 //!
228 //!     // Wait for all tasks to complete
229 //!     for join_handle in join_handles.drain(..) {
230 //!         join_handle.await.unwrap();
231 //!     }
232 //! }
233 //! ```
234 //!
235 //! [mpsc]: mpsc
236 //!
237 //! ## `broadcast` channel
238 //!
239 //! The [`broadcast` channel] supports sending **many** values from
240 //! **many** producers to **many** consumers. Each consumer will receive
241 //! **each** value. This channel can be used to implement "fan out" style
242 //! patterns common with pub / sub or "chat" systems.
243 //!
244 //! This channel tends to be used less often than `oneshot` and `mpsc` but still
245 //! has its use cases.
246 //!
247 //! Basic usage
248 //!
249 //! ```
250 //! use tokio::sync::broadcast;
251 //!
252 //! #[tokio::main]
253 //! async fn main() {
254 //!     let (tx, mut rx1) = broadcast::channel(16);
255 //!     let mut rx2 = tx.subscribe();
256 //!
257 //!     tokio::spawn(async move {
258 //!         assert_eq!(rx1.recv().await.unwrap(), 10);
259 //!         assert_eq!(rx1.recv().await.unwrap(), 20);
260 //!     });
261 //!
262 //!     tokio::spawn(async move {
263 //!         assert_eq!(rx2.recv().await.unwrap(), 10);
264 //!         assert_eq!(rx2.recv().await.unwrap(), 20);
265 //!     });
266 //!
267 //!     tx.send(10).unwrap();
268 //!     tx.send(20).unwrap();
269 //! }
270 //! ```
271 //!
272 //! [`broadcast` channel]: crate::sync::broadcast
273 //!
274 //! ## `watch` channel
275 //!
276 //! The [`watch` channel] supports sending **many** values from a **single**
277 //! producer to **many** consumers. However, only the **most recent** value is
278 //! stored in the channel. Consumers are notified when a new value is sent, but
279 //! there is no guarantee that consumers will see **all** values.
280 //!
281 //! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1.
282 //!
283 //! Use cases for the [`watch` channel] include broadcasting configuration
284 //! changes or signalling program state changes, such as transitioning to
285 //! shutdown.
286 //!
287 //! **Example:** use a [`watch` channel] to notify tasks of configuration
288 //! changes. In this example, a configuration file is checked periodically. When
289 //! the file changes, the configuration changes are signalled to consumers.
290 //!
291 //! ```
292 //! use tokio::sync::watch;
293 //! use tokio::time::{self, Duration, Instant};
294 //!
295 //! use std::io;
296 //!
297 //! #[derive(Debug, Clone, Eq, PartialEq)]
298 //! struct Config {
299 //!     timeout: Duration,
300 //! }
301 //!
302 //! impl Config {
303 //!     async fn load_from_file() -> io::Result<Config> {
304 //!         // file loading and deserialization logic here
305 //! # Ok(Config { timeout: Duration::from_secs(1) })
306 //!     }
307 //! }
308 //!
309 //! async fn my_async_operation() {
310 //!     // Do something here
311 //! }
312 //!
313 //! #[tokio::main]
314 //! async fn main() {
315 //!     // Load initial configuration value
316 //!     let mut config = Config::load_from_file().await.unwrap();
317 //!
318 //!     // Create the watch channel, initialized with the loaded configuration
319 //!     let (tx, rx) = watch::channel(config.clone());
320 //!
321 //!     // Spawn a task to monitor the file.
322 //!     tokio::spawn(async move {
323 //!         loop {
324 //!             // Wait 10 seconds between checks
325 //!             time::sleep(Duration::from_secs(10)).await;
326 //!
327 //!             // Load the configuration file
328 //!             let new_config = Config::load_from_file().await.unwrap();
329 //!
330 //!             // If the configuration changed, send the new config value
331 //!             // on the watch channel.
332 //!             if new_config != config {
333 //!                 tx.send(new_config.clone()).unwrap();
334 //!                 config = new_config;
335 //!             }
336 //!         }
337 //!     });
338 //!
339 //!     let mut handles = vec![];
340 //!
341 //!     // Spawn tasks that runs the async operation for at most `timeout`. If
342 //!     // the timeout elapses, restart the operation.
343 //!     //
344 //!     // The task simultaneously watches the `Config` for changes. When the
345 //!     // timeout duration changes, the timeout is updated without restarting
346 //!     // the in-flight operation.
347 //!     for _ in 0..5 {
348 //!         // Clone a config watch handle for use in this task
349 //!         let mut rx = rx.clone();
350 //!
351 //!         let handle = tokio::spawn(async move {
352 //!             // Start the initial operation and pin the future to the stack.
353 //!             // Pinning to the stack is required to resume the operation
354 //!             // across multiple calls to `select!`
355 //!             let op = my_async_operation();
356 //!             tokio::pin!(op);
357 //!
358 //!             // Get the initial config value
359 //!             let mut conf = rx.borrow().clone();
360 //!
361 //!             let mut op_start = Instant::now();
362 //!             let sleep = time::sleep_until(op_start + conf.timeout);
363 //!             tokio::pin!(sleep);
364 //!
365 //!             loop {
366 //!                 tokio::select! {
367 //!                     _ = &mut sleep => {
368 //!                         // The operation elapsed. Restart it
369 //!                         op.set(my_async_operation());
370 //!
371 //!                         // Track the new start time
372 //!                         op_start = Instant::now();
373 //!
374 //!                         // Restart the timeout
375 //!                         sleep.set(time::sleep_until(op_start + conf.timeout));
376 //!                     }
377 //!                     _ = rx.changed() => {
378 //!                         conf = rx.borrow().clone();
379 //!
380 //!                         // The configuration has been updated. Update the
381 //!                         // `sleep` using the new `timeout` value.
382 //!                         sleep.as_mut().reset(op_start + conf.timeout);
383 //!                     }
384 //!                     _ = &mut op => {
385 //!                         // The operation completed!
386 //!                         return
387 //!                     }
388 //!                 }
389 //!             }
390 //!         });
391 //!
392 //!         handles.push(handle);
393 //!     }
394 //!
395 //!     for handle in handles.drain(..) {
396 //!         handle.await.unwrap();
397 //!     }
398 //! }
399 //! ```
400 //!
401 //! [`watch` channel]: mod@crate::sync::watch
402 //! [`broadcast` channel]: mod@crate::sync::broadcast
403 //!
404 //! # State synchronization
405 //!
406 //! The remaining synchronization primitives focus on synchronizing state.
407 //! These are asynchronous equivalents to versions provided by `std`. They
408 //! operate in a similar way as their `std` counterparts but will wait
409 //! asynchronously instead of blocking the thread.
410 //!
411 //! * [`Barrier`](Barrier) Ensures multiple tasks will wait for each other to
412 //!   reach a point in the program, before continuing execution all together.
413 //!
414 //! * [`Mutex`](Mutex) Mutual Exclusion mechanism, which ensures that at most
415 //!   one thread at a time is able to access some data.
416 //!
417 //! * [`Notify`](Notify) Basic task notification. `Notify` supports notifying a
418 //!   receiving task without sending data. In this case, the task wakes up and
419 //!   resumes processing.
420 //!
421 //! * [`RwLock`](RwLock) Provides a mutual exclusion mechanism which allows
422 //!   multiple readers at the same time, while allowing only one writer at a
423 //!   time. In some cases, this can be more efficient than a mutex.
424 //!
425 //! * [`Semaphore`](Semaphore) Limits the amount of concurrency. A semaphore
426 //!   holds a number of permits, which tasks may request in order to enter a
427 //!   critical section. Semaphores are useful for implementing limiting or
428 //!   bounding of any kind.
429 
430 cfg_sync! {
431     mod barrier;
432     pub use barrier::{Barrier, BarrierWaitResult};
433 
434     pub mod broadcast;
435 
436     pub mod mpsc;
437 
438     mod mutex;
439     pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard};
440 
441     pub(crate) mod notify;
442     pub use notify::Notify;
443 
444     pub mod oneshot;
445 
446     pub(crate) mod batch_semaphore;
447     pub use batch_semaphore::{AcquireError, TryAcquireError};
448 
449     mod semaphore;
450     pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
451 
452     mod rwlock;
453     pub use rwlock::RwLock;
454     pub use rwlock::owned_read_guard::OwnedRwLockReadGuard;
455     pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard;
456     pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
457     pub use rwlock::read_guard::RwLockReadGuard;
458     pub use rwlock::write_guard::RwLockWriteGuard;
459     pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard;
460 
461     mod task;
462     pub(crate) use task::AtomicWaker;
463 
464     mod once_cell;
465     pub use self::once_cell::{OnceCell, SetError};
466 
467     pub mod watch;
468 }
469 
470 cfg_not_sync! {
471     cfg_fs! {
472         pub(crate) mod batch_semaphore;
473         mod mutex;
474         pub(crate) use mutex::Mutex;
475     }
476 
477     #[cfg(any(feature = "rt", feature = "signal", all(unix, feature = "process")))]
478     pub(crate) mod notify;
479 
480     #[cfg(any(feature = "rt", all(windows, feature = "process")))]
481     pub(crate) mod oneshot;
482 
483     cfg_atomic_waker_impl! {
484         mod task;
485         pub(crate) use task::AtomicWaker;
486     }
487 
488     #[cfg(any(feature = "signal", all(unix, feature = "process")))]
489     pub(crate) mod watch;
490 }
491 
492 /// Unit tests
493 #[cfg(test)]
494 mod tests;
495