#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] use crate::sync::batch_semaphore as semaphore; use std::cell::UnsafeCell; use std::error::Error; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::Arc; /// An asynchronous `Mutex`-like type. /// /// This type acts similarly to [`std::sync::Mutex`], with two major /// differences: [`lock`] is an async method so does not block, and the lock /// guard is designed to be held across `.await` points. /// /// # Which kind of mutex should you use? /// /// Contrary to popular belief, it is ok and often preferred to use the ordinary /// [`Mutex`][std] from the standard library in asynchronous code. /// /// The feature that the async mutex offers over the blocking mutex is the /// ability to keep it locked across an `.await` point. This makes the async /// mutex more expensive than the blocking mutex, so the blocking mutex should /// be preferred in the cases where it can be used. The primary use case for the /// async mutex is to provide shared mutable access to IO resources such as a /// database connection. If the value behind the mutex is just data, it's /// usually appropriate to use a blocking mutex such as the one in the standard /// library or [`parking_lot`]. /// /// Note that, although the compiler will not prevent the std `Mutex` from holding /// its guard across `.await` points in situations where the task is not movable /// between threads, this virtually never leads to correct concurrent code in /// practice as it can easily lead to deadlocks. /// /// A common pattern is to wrap the `Arc>` in a struct that provides /// non-async methods for performing operations on the data within, and only /// lock the mutex inside these methods. The [mini-redis] example provides an /// illustration of this pattern. /// /// Additionally, when you _do_ want shared access to an IO resource, it is /// often better to spawn a task to manage the IO resource, and to use message /// passing to communicate with that task. /// /// [std]: std::sync::Mutex /// [`parking_lot`]: https://docs.rs/parking_lot /// [mini-redis]: https://github.com/tokio-rs/mini-redis/blob/master/src/db.rs /// /// # Examples: /// /// ```rust,no_run /// use tokio::sync::Mutex; /// use std::sync::Arc; /// /// #[tokio::main] /// async fn main() { /// let data1 = Arc::new(Mutex::new(0)); /// let data2 = Arc::clone(&data1); /// /// tokio::spawn(async move { /// let mut lock = data2.lock().await; /// *lock += 1; /// }); /// /// let mut lock = data1.lock().await; /// *lock += 1; /// } /// ``` /// /// /// ```rust,no_run /// use tokio::sync::Mutex; /// use std::sync::Arc; /// /// #[tokio::main] /// async fn main() { /// let count = Arc::new(Mutex::new(0)); /// /// for i in 0..5 { /// let my_count = Arc::clone(&count); /// tokio::spawn(async move { /// for j in 0..10 { /// let mut lock = my_count.lock().await; /// *lock += 1; /// println!("{} {} {}", i, j, lock); /// } /// }); /// } /// /// loop { /// if *count.lock().await >= 50 { /// break; /// } /// } /// println!("Count hit 50."); /// } /// ``` /// There are a few things of note here to pay attention to in this example. /// 1. The mutex is wrapped in an [`Arc`] to allow it to be shared across /// threads. /// 2. Each spawned task obtains a lock and releases it on every iteration. /// 3. Mutation of the data protected by the Mutex is done by de-referencing /// the obtained lock as seen on lines 12 and 19. /// /// Tokio's Mutex works in a simple FIFO (first in, first out) style where all /// calls to [`lock`] complete in the order they were performed. In that way the /// Mutex is "fair" and predictable in how it distributes the locks to inner /// data. Locks are released and reacquired after every iteration, so basically, /// each thread goes to the back of the line after it increments the value once. /// Note that there's some unpredictability to the timing between when the /// threads are started, but once they are going they alternate predictably. /// Finally, since there is only a single valid lock at any given time, there is /// no possibility of a race condition when mutating the inner value. /// /// Note that in contrast to [`std::sync::Mutex`], this implementation does not /// poison the mutex when a thread holding the [`MutexGuard`] panics. In such a /// case, the mutex will be unlocked. If the panic is caught, this might leave /// the data protected by the mutex in an inconsistent state. /// /// [`Mutex`]: struct@Mutex /// [`MutexGuard`]: struct@MutexGuard /// [`Arc`]: struct@std::sync::Arc /// [`std::sync::Mutex`]: struct@std::sync::Mutex /// [`Send`]: trait@std::marker::Send /// [`lock`]: method@Mutex::lock pub struct Mutex { s: semaphore::Semaphore, c: UnsafeCell, } /// A handle to a held `Mutex`. The guard can be held across any `.await` point /// as it is [`Send`]. /// /// As long as you have this guard, you have exclusive access to the underlying /// `T`. The guard internally borrows the `Mutex`, so the mutex will not be /// dropped while a guard exists. /// /// The lock is automatically released whenever the guard is dropped, at which /// point `lock` will succeed yet again. pub struct MutexGuard<'a, T: ?Sized> { lock: &'a Mutex, } /// An owned handle to a held `Mutex`. /// /// This guard is only available from a `Mutex` that is wrapped in an [`Arc`]. It /// is identical to `MutexGuard`, except that rather than borrowing the `Mutex`, /// it clones the `Arc`, incrementing the reference count. This means that /// unlike `MutexGuard`, it will have the `'static` lifetime. /// /// As long as you have this guard, you have exclusive access to the underlying /// `T`. The guard internally keeps a reference-counted pointer to the original /// `Mutex`, so even if the lock goes away, the guard remains valid. /// /// The lock is automatically released whenever the guard is dropped, at which /// point `lock` will succeed yet again. /// /// [`Arc`]: std::sync::Arc pub struct OwnedMutexGuard { lock: Arc>, } // As long as T: Send, it's fine to send and share Mutex between threads. // If T was not Send, sending and sharing a Mutex would be bad, since you can // access T through Mutex. unsafe impl Send for Mutex where T: ?Sized + Send {} unsafe impl Sync for Mutex where T: ?Sized + Send {} unsafe impl Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {} unsafe impl Sync for OwnedMutexGuard where T: ?Sized + Send + Sync {} /// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and /// [`RwLock::try_write`] functions. /// /// `Mutex::try_lock` operation will only fail if the mutex is already locked. /// /// `RwLock::try_read` operation will only fail if the lock is currently held /// by an exclusive writer. /// /// `RwLock::try_write` operation will if lock is held by any reader or by an /// exclusive writer. /// /// [`Mutex::try_lock`]: Mutex::try_lock /// [`RwLock::try_read`]: fn@super::RwLock::try_read /// [`RwLock::try_write`]: fn@super::RwLock::try_write #[derive(Debug)] pub struct TryLockError(pub(super) ()); impl fmt::Display for TryLockError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "operation would block") } } impl Error for TryLockError {} #[test] #[cfg(not(loom))] fn bounds() { fn check_send() {} fn check_unpin() {} // This has to take a value, since the async fn's return type is unnameable. fn check_send_sync_val(_t: T) {} fn check_send_sync() {} fn check_static() {} fn check_static_val(_t: T) {} check_send::>(); check_send::>(); check_unpin::>(); check_send_sync::>(); check_static::>(); let mutex = Mutex::new(1); check_send_sync_val(mutex.lock()); let arc_mutex = Arc::new(Mutex::new(1)); check_send_sync_val(arc_mutex.clone().lock_owned()); check_static_val(arc_mutex.lock_owned()); } impl Mutex { /// Creates a new lock in an unlocked state ready for use. /// /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// /// let lock = Mutex::new(5); /// ``` pub fn new(t: T) -> Self where T: Sized, { Self { c: UnsafeCell::new(t), s: semaphore::Semaphore::new(1), } } /// Creates a new lock in an unlocked state ready for use. /// /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// /// static LOCK: Mutex = Mutex::const_new(5); /// ``` #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] pub const fn const_new(t: T) -> Self where T: Sized, { Self { c: UnsafeCell::new(t), s: semaphore::Semaphore::const_new(1), } } /// Locks this mutex, causing the current task /// to yield until the lock has been acquired. /// When the lock has been acquired, function returns a [`MutexGuard`]. /// /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// /// #[tokio::main] /// async fn main() { /// let mutex = Mutex::new(1); /// /// let mut n = mutex.lock().await; /// *n = 2; /// } /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { self.acquire().await; MutexGuard { lock: self } } /// Locks this mutex, causing the current task to yield until the lock has /// been acquired. When the lock has been acquired, this returns an /// [`OwnedMutexGuard`]. /// /// This method is identical to [`Mutex::lock`], except that the returned /// guard references the `Mutex` with an [`Arc`] rather than by borrowing /// it. Therefore, the `Mutex` must be wrapped in an `Arc` to call this /// method, and the guard will live for the `'static` lifetime, as it keeps /// the `Mutex` alive by holding an `Arc`. /// /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// use std::sync::Arc; /// /// #[tokio::main] /// async fn main() { /// let mutex = Arc::new(Mutex::new(1)); /// /// let mut n = mutex.clone().lock_owned().await; /// *n = 2; /// } /// ``` /// /// [`Arc`]: std::sync::Arc pub async fn lock_owned(self: Arc) -> OwnedMutexGuard { self.acquire().await; OwnedMutexGuard { lock: self } } async fn acquire(&self) { self.s.acquire(1).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and // we own it exclusively, which means that this can never happen. unreachable!() }); } /// Attempts to acquire the lock, and returns [`TryLockError`] if the /// lock is currently held somewhere else. /// /// [`TryLockError`]: TryLockError /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// # async fn dox() -> Result<(), tokio::sync::TryLockError> { /// /// let mutex = Mutex::new(1); /// /// let n = mutex.try_lock()?; /// assert_eq!(*n, 1); /// # Ok(()) /// # } /// ``` pub fn try_lock(&self) -> Result, TryLockError> { match self.s.try_acquire(1) { Ok(_) => Ok(MutexGuard { lock: self }), Err(_) => Err(TryLockError(())), } } /// Returns a mutable reference to the underlying data. /// /// Since this call borrows the `Mutex` mutably, no actual locking needs to /// take place -- the mutable borrow statically guarantees no locks exist. /// /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// /// fn main() { /// let mut mutex = Mutex::new(1); /// /// let n = mutex.get_mut(); /// *n = 2; /// } /// ``` pub fn get_mut(&mut self) -> &mut T { unsafe { // Safety: This is https://github.com/rust-lang/rust/pull/76936 &mut *self.c.get() } } /// Attempts to acquire the lock, and returns [`TryLockError`] if the lock /// is currently held somewhere else. /// /// This method is identical to [`Mutex::try_lock`], except that the /// returned guard references the `Mutex` with an [`Arc`] rather than by /// borrowing it. Therefore, the `Mutex` must be wrapped in an `Arc` to call /// this method, and the guard will live for the `'static` lifetime, as it /// keeps the `Mutex` alive by holding an `Arc`. /// /// [`TryLockError`]: TryLockError /// [`Arc`]: std::sync::Arc /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// use std::sync::Arc; /// # async fn dox() -> Result<(), tokio::sync::TryLockError> { /// /// let mutex = Arc::new(Mutex::new(1)); /// /// let n = mutex.clone().try_lock_owned()?; /// assert_eq!(*n, 1); /// # Ok(()) /// # } pub fn try_lock_owned(self: Arc) -> Result, TryLockError> { match self.s.try_acquire(1) { Ok(_) => Ok(OwnedMutexGuard { lock: self }), Err(_) => Err(TryLockError(())), } } /// Consumes the mutex, returning the underlying data. /// # Examples /// /// ``` /// use tokio::sync::Mutex; /// /// #[tokio::main] /// async fn main() { /// let mutex = Mutex::new(1); /// /// let n = mutex.into_inner(); /// assert_eq!(n, 1); /// } /// ``` pub fn into_inner(self) -> T where T: Sized, { self.c.into_inner() } } impl From for Mutex { fn from(s: T) -> Self { Self::new(s) } } impl Default for Mutex where T: Default, { fn default() -> Self { Self::new(T::default()) } } impl std::fmt::Debug for Mutex where T: std::fmt::Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut d = f.debug_struct("Mutex"); match self.try_lock() { Ok(inner) => d.field("data", &*inner), Err(_) => d.field("data", &format_args!("")), }; d.finish() } } // === impl MutexGuard === impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { self.lock.s.release(1) } } impl Deref for MutexGuard<'_, T> { type Target = T; fn deref(&self) -> &Self::Target { unsafe { &*self.lock.c.get() } } } impl DerefMut for MutexGuard<'_, T> { fn deref_mut(&mut self) -> &mut Self::Target { unsafe { &mut *self.lock.c.get() } } } impl fmt::Debug for MutexGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&**self, f) } } impl fmt::Display for MutexGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&**self, f) } } // === impl OwnedMutexGuard === impl Drop for OwnedMutexGuard { fn drop(&mut self) { self.lock.s.release(1) } } impl Deref for OwnedMutexGuard { type Target = T; fn deref(&self) -> &Self::Target { unsafe { &*self.lock.c.get() } } } impl DerefMut for OwnedMutexGuard { fn deref_mut(&mut self) -> &mut Self::Target { unsafe { &mut *self.lock.c.get() } } } impl fmt::Debug for OwnedMutexGuard { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&**self, f) } } impl fmt::Display for OwnedMutexGuard { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&**self, f) } }