1 use crate::sync::batch_semaphore::*;
2
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::sync::atomic::AtomicUsize;
6 use loom::thread;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::atomic::Ordering::SeqCst;
10 use std::sync::Arc;
11 use std::task::Poll::Ready;
12 use std::task::{Context, Poll};
13
14 #[test]
basic_usage()15 fn basic_usage() {
16 const NUM: usize = 2;
17
18 struct Shared {
19 semaphore: Semaphore,
20 active: AtomicUsize,
21 }
22
23 async fn actor(shared: Arc<Shared>) {
24 shared.semaphore.acquire(1).await.unwrap();
25 let actual = shared.active.fetch_add(1, SeqCst);
26 assert!(actual <= NUM - 1);
27
28 let actual = shared.active.fetch_sub(1, SeqCst);
29 assert!(actual <= NUM);
30 shared.semaphore.release(1);
31 }
32
33 loom::model(|| {
34 let shared = Arc::new(Shared {
35 semaphore: Semaphore::new(NUM),
36 active: AtomicUsize::new(0),
37 });
38
39 for _ in 0..NUM {
40 let shared = shared.clone();
41
42 thread::spawn(move || {
43 block_on(actor(shared));
44 });
45 }
46
47 block_on(actor(shared));
48 });
49 }
50
51 #[test]
release()52 fn release() {
53 loom::model(|| {
54 let semaphore = Arc::new(Semaphore::new(1));
55
56 {
57 let semaphore = semaphore.clone();
58 thread::spawn(move || {
59 block_on(semaphore.acquire(1)).unwrap();
60 semaphore.release(1);
61 });
62 }
63
64 block_on(semaphore.acquire(1)).unwrap();
65
66 semaphore.release(1);
67 });
68 }
69
70 #[test]
basic_closing()71 fn basic_closing() {
72 const NUM: usize = 2;
73
74 loom::model(|| {
75 let semaphore = Arc::new(Semaphore::new(1));
76
77 for _ in 0..NUM {
78 let semaphore = semaphore.clone();
79
80 thread::spawn(move || {
81 for _ in 0..2 {
82 block_on(semaphore.acquire(1)).map_err(|_| ())?;
83
84 semaphore.release(1);
85 }
86
87 Ok::<(), ()>(())
88 });
89 }
90
91 semaphore.close();
92 });
93 }
94
95 #[test]
concurrent_close()96 fn concurrent_close() {
97 const NUM: usize = 3;
98
99 loom::model(|| {
100 let semaphore = Arc::new(Semaphore::new(1));
101
102 for _ in 0..NUM {
103 let semaphore = semaphore.clone();
104
105 thread::spawn(move || {
106 block_on(semaphore.acquire(1)).map_err(|_| ())?;
107 semaphore.release(1);
108 semaphore.close();
109
110 Ok::<(), ()>(())
111 });
112 }
113 });
114 }
115
116 #[test]
concurrent_cancel()117 fn concurrent_cancel() {
118 async fn poll_and_cancel(semaphore: Arc<Semaphore>) {
119 let mut acquire1 = Some(semaphore.acquire(1));
120 let mut acquire2 = Some(semaphore.acquire(1));
121 poll_fn(|cx| {
122 // poll the acquire future once, and then immediately throw
123 // it away. this simulates a situation where a future is
124 // polled and then cancelled, such as by a timeout.
125 if let Some(acquire) = acquire1.take() {
126 pin!(acquire);
127 let _ = acquire.poll(cx);
128 }
129 if let Some(acquire) = acquire2.take() {
130 pin!(acquire);
131 let _ = acquire.poll(cx);
132 }
133 Poll::Ready(())
134 })
135 .await
136 }
137
138 loom::model(|| {
139 let semaphore = Arc::new(Semaphore::new(0));
140 let t1 = {
141 let semaphore = semaphore.clone();
142 thread::spawn(move || block_on(poll_and_cancel(semaphore)))
143 };
144 let t2 = {
145 let semaphore = semaphore.clone();
146 thread::spawn(move || block_on(poll_and_cancel(semaphore)))
147 };
148 let t3 = {
149 let semaphore = semaphore.clone();
150 thread::spawn(move || block_on(poll_and_cancel(semaphore)))
151 };
152
153 t1.join().unwrap();
154 semaphore.release(10);
155 t2.join().unwrap();
156 t3.join().unwrap();
157 });
158 }
159
160 #[test]
batch()161 fn batch() {
162 let mut b = loom::model::Builder::new();
163 b.preemption_bound = Some(1);
164
165 b.check(|| {
166 let semaphore = Arc::new(Semaphore::new(10));
167 let active = Arc::new(AtomicUsize::new(0));
168 let mut ths = vec![];
169
170 for _ in 0..2 {
171 let semaphore = semaphore.clone();
172 let active = active.clone();
173
174 ths.push(thread::spawn(move || {
175 for n in &[4, 10, 8] {
176 block_on(semaphore.acquire(*n)).unwrap();
177
178 active.fetch_add(*n as usize, SeqCst);
179
180 let num_active = active.load(SeqCst);
181 assert!(num_active <= 10);
182
183 thread::yield_now();
184
185 active.fetch_sub(*n as usize, SeqCst);
186
187 semaphore.release(*n as usize);
188 }
189 }));
190 }
191
192 for th in ths.into_iter() {
193 th.join().unwrap();
194 }
195
196 assert_eq!(10, semaphore.available_permits());
197 });
198 }
199
200 #[test]
release_during_acquire()201 fn release_during_acquire() {
202 loom::model(|| {
203 let semaphore = Arc::new(Semaphore::new(10));
204 semaphore
205 .try_acquire(8)
206 .expect("try_acquire should succeed; semaphore uncontended");
207 let semaphore2 = semaphore.clone();
208 let thread = thread::spawn(move || block_on(semaphore2.acquire(4)).unwrap());
209
210 semaphore.release(8);
211 thread.join().unwrap();
212 semaphore.release(4);
213 assert_eq!(10, semaphore.available_permits());
214 })
215 }
216