1 use crate::runtime::queue;
2 use crate::runtime::task::{self, Schedule, Task};
3
4 use std::thread;
5 use std::time::Duration;
6
7 #[test]
fits_256()8 fn fits_256() {
9 let (_, mut local) = queue::local();
10 let inject = queue::Inject::new();
11
12 for _ in 0..256 {
13 let (task, _) = task::joinable::<_, Runtime>(async {});
14 local.push_back(task, &inject);
15 }
16
17 assert!(inject.pop().is_none());
18
19 while local.pop().is_some() {}
20 }
21
22 #[test]
overflow()23 fn overflow() {
24 let (_, mut local) = queue::local();
25 let inject = queue::Inject::new();
26
27 for _ in 0..257 {
28 let (task, _) = task::joinable::<_, Runtime>(async {});
29 local.push_back(task, &inject);
30 }
31
32 let mut n = 0;
33
34 while inject.pop().is_some() {
35 n += 1;
36 }
37
38 while local.pop().is_some() {
39 n += 1;
40 }
41
42 assert_eq!(n, 257);
43 }
44
45 #[test]
steal_batch()46 fn steal_batch() {
47 let (steal1, mut local1) = queue::local();
48 let (_, mut local2) = queue::local();
49 let inject = queue::Inject::new();
50
51 for _ in 0..4 {
52 let (task, _) = task::joinable::<_, Runtime>(async {});
53 local1.push_back(task, &inject);
54 }
55
56 assert!(steal1.steal_into(&mut local2).is_some());
57
58 for _ in 0..1 {
59 assert!(local2.pop().is_some());
60 }
61
62 assert!(local2.pop().is_none());
63
64 for _ in 0..2 {
65 assert!(local1.pop().is_some());
66 }
67
68 assert!(local1.pop().is_none());
69 }
70
71 #[test]
stress1()72 fn stress1() {
73 const NUM_ITER: usize = 1;
74 const NUM_STEAL: usize = 1_000;
75 const NUM_LOCAL: usize = 1_000;
76 const NUM_PUSH: usize = 500;
77 const NUM_POP: usize = 250;
78
79 for _ in 0..NUM_ITER {
80 let (steal, mut local) = queue::local();
81 let inject = queue::Inject::new();
82
83 let th = thread::spawn(move || {
84 let (_, mut local) = queue::local();
85 let mut n = 0;
86
87 for _ in 0..NUM_STEAL {
88 if steal.steal_into(&mut local).is_some() {
89 n += 1;
90 }
91
92 while local.pop().is_some() {
93 n += 1;
94 }
95
96 thread::yield_now();
97 }
98
99 n
100 });
101
102 let mut n = 0;
103
104 for _ in 0..NUM_LOCAL {
105 for _ in 0..NUM_PUSH {
106 let (task, _) = task::joinable::<_, Runtime>(async {});
107 local.push_back(task, &inject);
108 }
109
110 for _ in 0..NUM_POP {
111 if local.pop().is_some() {
112 n += 1;
113 } else {
114 break;
115 }
116 }
117 }
118
119 while inject.pop().is_some() {
120 n += 1;
121 }
122
123 n += th.join().unwrap();
124
125 assert_eq!(n, NUM_LOCAL * NUM_PUSH);
126 }
127 }
128
129 #[test]
stress2()130 fn stress2() {
131 const NUM_ITER: usize = 1;
132 const NUM_TASKS: usize = 1_000_000;
133 const NUM_STEAL: usize = 1_000;
134
135 for _ in 0..NUM_ITER {
136 let (steal, mut local) = queue::local();
137 let inject = queue::Inject::new();
138
139 let th = thread::spawn(move || {
140 let (_, mut local) = queue::local();
141 let mut n = 0;
142
143 for _ in 0..NUM_STEAL {
144 if steal.steal_into(&mut local).is_some() {
145 n += 1;
146 }
147
148 while local.pop().is_some() {
149 n += 1;
150 }
151
152 thread::sleep(Duration::from_micros(10));
153 }
154
155 n
156 });
157
158 let mut num_pop = 0;
159
160 for i in 0..NUM_TASKS {
161 let (task, _) = task::joinable::<_, Runtime>(async {});
162 local.push_back(task, &inject);
163
164 if i % 128 == 0 && local.pop().is_some() {
165 num_pop += 1;
166 }
167
168 while inject.pop().is_some() {
169 num_pop += 1;
170 }
171 }
172
173 num_pop += th.join().unwrap();
174
175 while local.pop().is_some() {
176 num_pop += 1;
177 }
178
179 while inject.pop().is_some() {
180 num_pop += 1;
181 }
182
183 assert_eq!(num_pop, NUM_TASKS);
184 }
185 }
186
187 struct Runtime;
188
189 impl Schedule for Runtime {
bind(task: Task<Self>) -> Runtime190 fn bind(task: Task<Self>) -> Runtime {
191 std::mem::forget(task);
192 Runtime
193 }
194
release(&self, _task: &Task<Self>) -> Option<Task<Self>>195 fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
196 None
197 }
198
schedule(&self, _task: task::Notified<Self>)199 fn schedule(&self, _task: task::Notified<Self>) {
200 unreachable!();
201 }
202 }
203