1 // Copyright 2019 The Marl Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "marl_test.h"
16 
17 #include "marl/containers.h"
18 #include "marl/defer.h"
19 #include "marl/event.h"
20 #include "marl/waitgroup.h"
21 
22 #include <atomic>
23 
TEST_F(WithoutBoundScheduler,SchedulerConstructAndDestruct)24 TEST_F(WithoutBoundScheduler, SchedulerConstructAndDestruct) {
25   auto scheduler = std::unique_ptr<marl::Scheduler>(
26       new marl::Scheduler(marl::Scheduler::Config()));
27 }
28 
TEST_F(WithoutBoundScheduler,SchedulerBindGetUnbind)29 TEST_F(WithoutBoundScheduler, SchedulerBindGetUnbind) {
30   auto scheduler = std::unique_ptr<marl::Scheduler>(
31       new marl::Scheduler(marl::Scheduler::Config()));
32   scheduler->bind();
33   auto got = marl::Scheduler::get();
34   ASSERT_EQ(scheduler.get(), got);
35   scheduler->unbind();
36   got = marl::Scheduler::get();
37   ASSERT_EQ(got, nullptr);
38 }
39 
TEST_F(WithoutBoundScheduler,CheckConfig)40 TEST_F(WithoutBoundScheduler, CheckConfig) {
41   marl::Scheduler::Config cfg;
42   cfg.setAllocator(allocator).setWorkerThreadCount(10);
43 
44   auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
45 
46   auto gotCfg = scheduler->config();
47   ASSERT_EQ(gotCfg.allocator, allocator);
48   ASSERT_EQ(gotCfg.workerThread.count, 10);
49 }
50 
TEST_P(WithBoundScheduler,DestructWithPendingTasks)51 TEST_P(WithBoundScheduler, DestructWithPendingTasks) {
52   std::atomic<int> counter = {0};
53   for (int i = 0; i < 1000; i++) {
54     marl::schedule([&] { counter++; });
55   }
56 
57   auto scheduler = marl::Scheduler::get();
58   scheduler->unbind();
59   delete scheduler;
60 
61   // All scheduled tasks should be completed before the scheduler is destructed.
62   ASSERT_EQ(counter.load(), 1000);
63 
64   // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
65   (new marl::Scheduler(marl::Scheduler::Config()))->bind();
66 }
67 
TEST_P(WithBoundScheduler,DestructWithPendingFibers)68 TEST_P(WithBoundScheduler, DestructWithPendingFibers) {
69   std::atomic<int> counter = {0};
70 
71   marl::WaitGroup wg(1);
72   for (int i = 0; i < 1000; i++) {
73     marl::schedule([&] {
74       wg.wait();
75       counter++;
76     });
77   }
78 
79   // Schedule a task to unblock all the tasks scheduled above.
80   // We assume that some of these tasks will not finish before the scheduler
81   // destruction logic kicks in.
82   marl::schedule([=] {
83     wg.done();  // Ready, steady, go...
84   });
85 
86   auto scheduler = marl::Scheduler::get();
87   scheduler->unbind();
88   delete scheduler;
89 
90   // All scheduled tasks should be completed before the scheduler is destructed.
91   ASSERT_EQ(counter.load(), 1000);
92 
93   // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
94   (new marl::Scheduler(marl::Scheduler::Config()))->bind();
95 }
96 
TEST_P(WithBoundScheduler,ScheduleWithArgs)97 TEST_P(WithBoundScheduler, ScheduleWithArgs) {
98   std::string got;
99   marl::WaitGroup wg(1);
100   marl::schedule(
101       [wg, &got](std::string s, int i, bool b) {
102         got = "s: '" + s + "', i: " + std::to_string(i) +
103               ", b: " + (b ? "true" : "false");
104         wg.done();
105       },
106       "a string", 42, true);
107   wg.wait();
108   ASSERT_EQ(got, "s: 'a string', i: 42, b: true");
109 }
110 
TEST_P(WithBoundScheduler,FibersResumeOnSameThread)111 TEST_P(WithBoundScheduler, FibersResumeOnSameThread) {
112   marl::WaitGroup fence(1);
113   marl::WaitGroup wg(1000);
114   for (int i = 0; i < 1000; i++) {
115     marl::schedule([=] {
116       auto threadID = std::this_thread::get_id();
117       fence.wait();
118       ASSERT_EQ(threadID, std::this_thread::get_id());
119       wg.done();
120     });
121   }
122   // just to try and get some tasks to yield.
123   std::this_thread::sleep_for(std::chrono::milliseconds(10));
124   fence.done();
125   wg.wait();
126 }
127 
TEST_P(WithBoundScheduler,FibersResumeOnSameStdThread)128 TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) {
129   auto scheduler = marl::Scheduler::get();
130 
131   // on 32-bit OSs, excessive numbers of threads can run out of address space.
132   constexpr auto num_threads = sizeof(void*) > 4 ? 1000 : 100;
133 
134   marl::WaitGroup fence(1);
135   marl::WaitGroup wg(num_threads);
136 
137   marl::containers::vector<std::thread, 32> threads;
138   for (int i = 0; i < num_threads; i++) {
139     threads.emplace_back(std::thread([=] {
140       scheduler->bind();
141       defer(scheduler->unbind());
142 
143       auto threadID = std::this_thread::get_id();
144       fence.wait();
145       ASSERT_EQ(threadID, std::this_thread::get_id());
146       wg.done();
147     }));
148   }
149   // just to try and get some tasks to yield.
150   std::this_thread::sleep_for(std::chrono::milliseconds(10));
151   fence.done();
152   wg.wait();
153 
154   for (auto& thread : threads) {
155     thread.join();
156   }
157 }
158 
TEST_F(WithoutBoundScheduler,TasksOnlyScheduledOnWorkerThreads)159 TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
160   marl::Scheduler::Config cfg;
161   cfg.setWorkerThreadCount(8);
162 
163   auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
164   scheduler->bind();
165   defer(scheduler->unbind());
166 
167   std::mutex mutex;
168   marl::containers::unordered_set<std::thread::id> threads(allocator);
169   marl::WaitGroup wg;
170   for (int i = 0; i < 10000; i++) {
171     wg.add(1);
172     marl::schedule([&mutex, &threads, wg] {
173       defer(wg.done());
174       std::unique_lock<std::mutex> lock(mutex);
175       threads.emplace(std::this_thread::get_id());
176     });
177   }
178   wg.wait();
179 
180   ASSERT_LE(threads.size(), 8U);
181   ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U);
182 }
183 
184 // Test that a marl::Scheduler *with dedicated worker threads* can be used
185 // without first binding to the scheduling thread.
TEST_F(WithoutBoundScheduler,ScheduleMTWWithNoBind)186 TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) {
187   marl::Scheduler::Config cfg;
188   cfg.setWorkerThreadCount(8);
189   auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
190 
191   marl::WaitGroup wg;
192   for (int i = 0; i < 100; i++) {
193     wg.add(1);
194 
195     marl::Event event;
196     scheduler->enqueue(marl::Task([event, wg] {
197       event.wait();  // Test that tasks can wait on other tasks.
198       wg.done();
199     }));
200 
201     scheduler->enqueue(marl::Task([event, &scheduler] {
202       // Despite the main thread never binding the scheduler, the scheduler
203       // should be automatically bound to worker threads.
204       ASSERT_EQ(marl::Scheduler::get(), scheduler.get());
205 
206       event.signal();
207     }));
208   }
209 
210   // As the scheduler has not been bound to the main thread, the wait() call
211   // here will block **without** fiber yielding.
212   wg.wait();
213 }
214 
215 // Test that a marl::Scheduler *without dedicated worker threads* cannot be used
216 // without first binding to the scheduling thread.
TEST_F(WithoutBoundScheduler,ScheduleSTWWithNoBind)217 TEST_F(WithoutBoundScheduler, ScheduleSTWWithNoBind) {
218   marl::Scheduler::Config cfg;
219   auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
220 
221 #if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
222   EXPECT_DEATH(scheduler->enqueue(marl::Task([] {})),
223                "Did you forget to call marl::Scheduler::bind");
224 #elif !MARL_DEBUG_ENABLED
225   scheduler->enqueue(marl::Task([] { FAIL() << "Should not be called"; }));
226 #endif
227 }
228