1 #![warn(rust_2018_idioms)]
2
3 use std::sync::Arc;
4 use std::task::Poll;
5
6 use futures::future::FutureExt;
7 use futures::stream;
8 use futures::stream::StreamExt;
9
10 use tokio::sync::{Barrier, RwLock};
11 use tokio_test::task::spawn;
12 use tokio_test::{assert_pending, assert_ready};
13
14 #[test]
into_inner()15 fn into_inner() {
16 let rwlock = RwLock::new(42);
17 assert_eq!(rwlock.into_inner(), 42);
18 }
19
20 // multiple reads should be Ready
21 #[test]
read_shared()22 fn read_shared() {
23 let rwlock = RwLock::new(100);
24
25 let mut t1 = spawn(rwlock.read());
26 let _g1 = assert_ready!(t1.poll());
27 let mut t2 = spawn(rwlock.read());
28 assert_ready!(t2.poll());
29 }
30
31 // When there is an active shared owner, exclusive access should not be possible
32 #[test]
write_shared_pending()33 fn write_shared_pending() {
34 let rwlock = RwLock::new(100);
35 let mut t1 = spawn(rwlock.read());
36
37 let _g1 = assert_ready!(t1.poll());
38 let mut t2 = spawn(rwlock.write());
39 assert_pending!(t2.poll());
40 }
41
42 // When there is an active exclusive owner, subsequent exclusive access should not be possible
43 #[test]
read_exclusive_pending()44 fn read_exclusive_pending() {
45 let rwlock = RwLock::new(100);
46 let mut t1 = spawn(rwlock.write());
47
48 let _g1 = assert_ready!(t1.poll());
49 let mut t2 = spawn(rwlock.read());
50 assert_pending!(t2.poll());
51 }
52
53 // If the max shared access is reached and subsquent shared access is pending
54 // should be made available when one of the shared acesses is dropped
55 #[test]
exhaust_reading()56 fn exhaust_reading() {
57 let rwlock = RwLock::with_max_readers(100, 1024);
58 let mut reads = Vec::new();
59 loop {
60 let mut t = spawn(rwlock.read());
61 match t.poll() {
62 Poll::Ready(guard) => reads.push(guard),
63 Poll::Pending => break,
64 }
65 }
66
67 let mut t1 = spawn(rwlock.read());
68 assert_pending!(t1.poll());
69 let g2 = reads.pop().unwrap();
70 drop(g2);
71 assert!(t1.is_woken());
72 assert_ready!(t1.poll());
73 }
74
75 // When there is an active exclusive owner, subsequent exclusive access should not be possible
76 #[test]
write_exclusive_pending()77 fn write_exclusive_pending() {
78 let rwlock = RwLock::new(100);
79 let mut t1 = spawn(rwlock.write());
80
81 let _g1 = assert_ready!(t1.poll());
82 let mut t2 = spawn(rwlock.write());
83 assert_pending!(t2.poll());
84 }
85
86 // When there is an active shared owner, exclusive access should be possible after shared is dropped
87 #[test]
write_shared_drop()88 fn write_shared_drop() {
89 let rwlock = RwLock::new(100);
90 let mut t1 = spawn(rwlock.read());
91
92 let g1 = assert_ready!(t1.poll());
93 let mut t2 = spawn(rwlock.write());
94 assert_pending!(t2.poll());
95 drop(g1);
96 assert!(t2.is_woken());
97 assert_ready!(t2.poll());
98 }
99
100 // when there is an active shared owner, and exclusive access is triggered,
101 // subsequent shared access should not be possible as write gathers all the available semaphore permits
102 #[test]
write_read_shared_pending()103 fn write_read_shared_pending() {
104 let rwlock = RwLock::new(100);
105 let mut t1 = spawn(rwlock.read());
106 let _g1 = assert_ready!(t1.poll());
107
108 let mut t2 = spawn(rwlock.read());
109 assert_ready!(t2.poll());
110
111 let mut t3 = spawn(rwlock.write());
112 assert_pending!(t3.poll());
113
114 let mut t4 = spawn(rwlock.read());
115 assert_pending!(t4.poll());
116 }
117
118 // when there is an active shared owner, and exclusive access is triggered,
119 // reading should be possible after pending exclusive access is dropped
120 #[test]
write_read_shared_drop_pending()121 fn write_read_shared_drop_pending() {
122 let rwlock = RwLock::new(100);
123 let mut t1 = spawn(rwlock.read());
124 let _g1 = assert_ready!(t1.poll());
125
126 let mut t2 = spawn(rwlock.write());
127 assert_pending!(t2.poll());
128
129 let mut t3 = spawn(rwlock.read());
130 assert_pending!(t3.poll());
131 drop(t2);
132
133 assert!(t3.is_woken());
134 assert_ready!(t3.poll());
135 }
136
137 // Acquire an RwLock nonexclusively by a single task
138 #[tokio::test]
read_uncontested()139 async fn read_uncontested() {
140 let rwlock = RwLock::new(100);
141 let result = *rwlock.read().await;
142
143 assert_eq!(result, 100);
144 }
145
146 // Acquire an uncontested RwLock in exclusive mode
147 #[tokio::test]
write_uncontested()148 async fn write_uncontested() {
149 let rwlock = RwLock::new(100);
150 let mut result = rwlock.write().await;
151 *result += 50;
152 assert_eq!(*result, 150);
153 }
154
155 // RwLocks should be acquired in the order that their Futures are waited upon.
156 #[tokio::test]
write_order()157 async fn write_order() {
158 let rwlock = RwLock::<Vec<u32>>::new(vec![]);
159 let fut2 = rwlock.write().map(|mut guard| guard.push(2));
160 let fut1 = rwlock.write().map(|mut guard| guard.push(1));
161 fut1.await;
162 fut2.await;
163
164 let g = rwlock.read().await;
165 assert_eq!(*g, vec![1, 2]);
166 }
167
168 // A single RwLock is contested by tasks in multiple threads
169 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
multithreaded()170 async fn multithreaded() {
171 let barrier = Arc::new(Barrier::new(5));
172 let rwlock = Arc::new(RwLock::<u32>::new(0));
173 let rwclone1 = rwlock.clone();
174 let rwclone2 = rwlock.clone();
175 let rwclone3 = rwlock.clone();
176 let rwclone4 = rwlock.clone();
177
178 let b1 = barrier.clone();
179 tokio::spawn(async move {
180 stream::iter(0..1000)
181 .for_each(move |_| {
182 let rwlock = rwclone1.clone();
183 async move {
184 let mut guard = rwlock.write().await;
185 *guard += 2;
186 }
187 })
188 .await;
189 b1.wait().await;
190 });
191
192 let b2 = barrier.clone();
193 tokio::spawn(async move {
194 stream::iter(0..1000)
195 .for_each(move |_| {
196 let rwlock = rwclone2.clone();
197 async move {
198 let mut guard = rwlock.write().await;
199 *guard += 3;
200 }
201 })
202 .await;
203 b2.wait().await;
204 });
205
206 let b3 = barrier.clone();
207 tokio::spawn(async move {
208 stream::iter(0..1000)
209 .for_each(move |_| {
210 let rwlock = rwclone3.clone();
211 async move {
212 let mut guard = rwlock.write().await;
213 *guard += 5;
214 }
215 })
216 .await;
217 b3.wait().await;
218 });
219
220 let b4 = barrier.clone();
221 tokio::spawn(async move {
222 stream::iter(0..1000)
223 .for_each(move |_| {
224 let rwlock = rwclone4.clone();
225 async move {
226 let mut guard = rwlock.write().await;
227 *guard += 7;
228 }
229 })
230 .await;
231 b4.wait().await;
232 });
233
234 barrier.wait().await;
235 let g = rwlock.read().await;
236 assert_eq!(*g, 17_000);
237 }
238
239 #[tokio::test]
try_write()240 async fn try_write() {
241 let lock = RwLock::new(0);
242 let read_guard = lock.read().await;
243 assert!(lock.try_write().is_err());
244 drop(read_guard);
245 assert!(lock.try_write().is_ok());
246 }
247
248 #[test]
try_read_try_write()249 fn try_read_try_write() {
250 let lock: RwLock<usize> = RwLock::new(15);
251
252 {
253 let rg1 = lock.try_read().unwrap();
254 assert_eq!(*rg1, 15);
255
256 assert!(lock.try_write().is_err());
257
258 let rg2 = lock.try_read().unwrap();
259 assert_eq!(*rg2, 15)
260 }
261
262 {
263 let mut wg = lock.try_write().unwrap();
264 *wg = 1515;
265
266 assert!(lock.try_read().is_err())
267 }
268
269 assert_eq!(*lock.try_read().unwrap(), 1515);
270 }
271