1 //! The motivation for SharedMutex is to guard a resource without having to
2 //! extend its lifetime using an Rc<> (and potentially create reference cycles)
3 
4 use std::{future::Future, rc::Rc, sync::Arc};
5 
6 use tokio::sync::{Mutex, OwnedMutexGuard, Semaphore, TryLockError};
7 
8 /// A mutex wrapping some contents of type T. When the mutex is dropped,
9 /// T will be dropped.
10 ///
11 /// The lifetime of T will be extended only if a client currently holds
12 /// exclusive access to it, in which case once that client drops its
13 /// MutexGuard, then T will be dropped.
14 pub struct SharedMutex<T> {
15     lock: Arc<Mutex<T>>,
16     on_death: Rc<Semaphore>,
17 }
18 
19 impl<T> SharedMutex<T> {
20     /// Constructor
new(t: T) -> Self21     pub fn new(t: T) -> Self {
22         Self { lock: Arc::new(Mutex::new(t)), on_death: Rc::new(Semaphore::new(0)) }
23     }
24 
25     /// Acquire exclusive access to T, or None if SharedMutex<T> is dropped
26     /// while waiting to acquire. Unlike Mutex::lock, this method produces a
27     /// future with 'static lifetime, so it can be awaited even if the
28     /// SharedMutex<> itself is dropped.
29     ///
30     /// NOTE: if the lifetime of T is extended by the holder of the lock when
31     /// the SharedMutex<> itself is dropped, all waiters will still
32     /// instantly return None (rather than waiting for the lock to be
33     /// released).
lock(&self) -> impl Future<Output = Option<OwnedMutexGuard<T>>>34     pub fn lock(&self) -> impl Future<Output = Option<OwnedMutexGuard<T>>> {
35         let mutex = self.lock.clone();
36         let on_death = self.on_death.clone();
37 
38         async move {
39             tokio::select! {
40             biased;
41               permit = on_death.acquire() => {
42                 drop(permit);
43                 None
44               },
45               acquired = mutex.lock_owned() => {
46                 Some(acquired)
47               },
48             }
49         }
50     }
51 
52     /// Synchronously acquire the lock. This similarly exhibits the
53     /// lifetime-extension behavior of Self#lock().
try_lock(&self) -> Result<OwnedMutexGuard<T>, TryLockError>54     pub fn try_lock(&self) -> Result<OwnedMutexGuard<T>, TryLockError> {
55         self.lock.clone().try_lock_owned()
56     }
57 }
58 
59 impl<T> Drop for SharedMutex<T> {
drop(&mut self)60     fn drop(&mut self) {
61         // mark dead, so all waiters instantly return
62         // no one can start to wait after drop
63         self.on_death.add_permits(Arc::strong_count(&self.lock) + 1);
64     }
65 }
66