1 // Copyright 2019 The Marl Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "marl/conditionvariable.h"
16 #include "marl/waitgroup.h"
17 
18 #include "marl_test.h"
19 
20 #include <condition_variable>
21 
TEST_F(WithoutBoundScheduler,ConditionVariable)22 TEST_F(WithoutBoundScheduler, ConditionVariable) {
23   bool trigger[3] = {false, false, false};
24   bool signal[3] = {false, false, false};
25   marl::mutex mutex;
26   marl::ConditionVariable cv;
27 
28   std::thread thread([&] {
29     for (int i = 0; i < 3; i++) {
30       marl::lock lock(mutex);
31       cv.wait(lock, [&] {
32         EXPECT_TRUE(lock.owns_lock());
33         return trigger[i];
34       });
35       EXPECT_TRUE(lock.owns_lock());
36       signal[i] = true;
37       cv.notify_one();
38     }
39   });
40 
41   ASSERT_FALSE(signal[0]);
42   ASSERT_FALSE(signal[1]);
43   ASSERT_FALSE(signal[2]);
44 
45   for (int i = 0; i < 3; i++) {
46     {
47       marl::lock lock(mutex);
48       trigger[i] = true;
49       cv.notify_one();
50       cv.wait(lock, [&] {
51         EXPECT_TRUE(lock.owns_lock());
52         return signal[i];
53       });
54       EXPECT_TRUE(lock.owns_lock());
55     }
56 
57     ASSERT_EQ(signal[0], 0 <= i);
58     ASSERT_EQ(signal[1], 1 <= i);
59     ASSERT_EQ(signal[2], 2 <= i);
60   }
61 
62   thread.join();
63 }
64 
TEST_P(WithBoundScheduler,ConditionVariable)65 TEST_P(WithBoundScheduler, ConditionVariable) {
66   bool trigger[3] = {false, false, false};
67   bool signal[3] = {false, false, false};
68   marl::mutex mutex;
69   marl::ConditionVariable cv;
70 
71   std::thread thread([&] {
72     for (int i = 0; i < 3; i++) {
73       marl::lock lock(mutex);
74       cv.wait(lock, [&] {
75         EXPECT_TRUE(lock.owns_lock());
76         return trigger[i];
77       });
78       EXPECT_TRUE(lock.owns_lock());
79       signal[i] = true;
80       cv.notify_one();
81     }
82   });
83 
84   ASSERT_FALSE(signal[0]);
85   ASSERT_FALSE(signal[1]);
86   ASSERT_FALSE(signal[2]);
87 
88   for (int i = 0; i < 3; i++) {
89     {
90       marl::lock lock(mutex);
91       trigger[i] = true;
92       cv.notify_one();
93       cv.wait(lock, [&] {
94         EXPECT_TRUE(lock.owns_lock());
95         return signal[i];
96       });
97       EXPECT_TRUE(lock.owns_lock());
98     }
99 
100     ASSERT_EQ(signal[0], 0 <= i);
101     ASSERT_EQ(signal[1], 1 <= i);
102     ASSERT_EQ(signal[2], 2 <= i);
103   }
104 
105   thread.join();
106 }
107 
108 // ConditionVariableTimeouts spins up a whole lot of wait_fors(), unblocking
109 // some with timeouts and some with a notify, and then let's all the workers
110 // go to idle before repeating.
111 // This is testing to ensure that the scheduler handles timeouts correctly when
112 // they are early-unblocked, along with expected lock state.
TEST_P(WithBoundScheduler,ConditionVariableTimeouts)113 TEST_P(WithBoundScheduler, ConditionVariableTimeouts) {
114   for (int i = 0; i < 10; i++) {
115     marl::mutex mutex;
116     marl::ConditionVariable cv;
117     bool signaled = false;  // guarded by mutex
118     auto wg = marl::WaitGroup(100);
119     for (int j = 0; j < 100; j++) {
120       marl::schedule([=, &mutex, &cv, &signaled] {
121         {
122           marl::lock lock(mutex);
123           cv.wait_for(lock, std::chrono::milliseconds(j), [&] {
124             EXPECT_TRUE(lock.owns_lock());
125             return signaled;
126           });
127           EXPECT_TRUE(lock.owns_lock());
128         }
129         // Ensure the mutex unlock happens *before* the wg.done() call,
130         // otherwise the stack pointer may no longer be valid.
131         wg.done();
132       });
133     }
134     std::this_thread::sleep_for(std::chrono::milliseconds(50));
135     {
136       marl::lock lock(mutex);
137       signaled = true;
138       cv.notify_all();
139     }
140     wg.wait();
141   }
142 }
143