1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_libevent.h"
6
7 #include <unistd.h>
8
9 #include <memory>
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/bind_helpers.h"
14 #include "base/files/file_util.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/run_loop.h"
19 #include "base/single_thread_task_runner.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/synchronization/waitable_event_watcher.h"
22 #include "base/test/gtest_util.h"
23 #include "base/third_party/libevent/event.h"
24 #include "base/threading/sequenced_task_runner_handle.h"
25 #include "base/threading/thread.h"
26 #include "base/threading/thread_task_runner_handle.h"
27 #include "build/build_config.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29
30 namespace base {
31
32 class MessagePumpLibeventTest : public testing::Test {
33 protected:
MessagePumpLibeventTest()34 MessagePumpLibeventTest()
35 : ui_loop_(new MessageLoop(MessageLoop::TYPE_UI)),
36 io_thread_("MessagePumpLibeventTestIOThread") {}
37 ~MessagePumpLibeventTest() override = default;
38
SetUp()39 void SetUp() override {
40 Thread::Options options(MessageLoop::TYPE_IO, 0);
41 ASSERT_TRUE(io_thread_.StartWithOptions(options));
42 ASSERT_EQ(MessageLoop::TYPE_IO, io_thread_.message_loop()->type());
43 int ret = pipe(pipefds_);
44 ASSERT_EQ(0, ret);
45 }
46
TearDown()47 void TearDown() override {
48 if (IGNORE_EINTR(close(pipefds_[0])) < 0)
49 PLOG(ERROR) << "close";
50 if (IGNORE_EINTR(close(pipefds_[1])) < 0)
51 PLOG(ERROR) << "close";
52 }
53
WaitUntilIoThreadStarted()54 void WaitUntilIoThreadStarted() {
55 ASSERT_TRUE(io_thread_.WaitUntilThreadStarted());
56 }
57
io_runner() const58 scoped_refptr<SingleThreadTaskRunner> io_runner() const {
59 return io_thread_.task_runner();
60 }
61
OnLibeventNotification(MessagePumpLibevent * pump,MessagePumpLibevent::FdWatchController * controller)62 void OnLibeventNotification(
63 MessagePumpLibevent* pump,
64 MessagePumpLibevent::FdWatchController* controller) {
65 pump->OnLibeventNotification(0, EV_WRITE | EV_READ, controller);
66 }
67
68 int pipefds_[2];
69 std::unique_ptr<MessageLoop> ui_loop_;
70
71 private:
72 Thread io_thread_;
73 };
74
75 namespace {
76
77 // Concrete implementation of MessagePumpLibevent::FdWatcher that does
78 // nothing useful.
79 class StupidWatcher : public MessagePumpLibevent::FdWatcher {
80 public:
81 ~StupidWatcher() override = default;
82
83 // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int fd)84 void OnFileCanReadWithoutBlocking(int fd) override {}
OnFileCanWriteWithoutBlocking(int fd)85 void OnFileCanWriteWithoutBlocking(int fd) override {}
86 };
87
TEST_F(MessagePumpLibeventTest,QuitOutsideOfRun)88 TEST_F(MessagePumpLibeventTest, QuitOutsideOfRun) {
89 std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
90 ASSERT_DCHECK_DEATH(pump->Quit());
91 }
92
93 class BaseWatcher : public MessagePumpLibevent::FdWatcher {
94 public:
BaseWatcher(MessagePumpLibevent::FdWatchController * controller)95 explicit BaseWatcher(MessagePumpLibevent::FdWatchController* controller)
96 : controller_(controller) {
97 DCHECK(controller_);
98 }
99 ~BaseWatcher() override = default;
100
101 // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int)102 void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
103
OnFileCanWriteWithoutBlocking(int)104 void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
105
106 protected:
107 MessagePumpLibevent::FdWatchController* controller_;
108 };
109
110 class DeleteWatcher : public BaseWatcher {
111 public:
DeleteWatcher(MessagePumpLibevent::FdWatchController * controller)112 explicit DeleteWatcher(MessagePumpLibevent::FdWatchController* controller)
113 : BaseWatcher(controller) {}
114
~DeleteWatcher()115 ~DeleteWatcher() override { DCHECK(!controller_); }
116
OnFileCanWriteWithoutBlocking(int)117 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
118 DCHECK(controller_);
119 delete controller_;
120 controller_ = nullptr;
121 }
122 };
123
TEST_F(MessagePumpLibeventTest,DeleteWatcher)124 TEST_F(MessagePumpLibeventTest, DeleteWatcher) {
125 std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
126 MessagePumpLibevent::FdWatchController* watcher =
127 new MessagePumpLibevent::FdWatchController(FROM_HERE);
128 DeleteWatcher delegate(watcher);
129 pump->WatchFileDescriptor(pipefds_[1],
130 false, MessagePumpLibevent::WATCH_READ_WRITE, watcher, &delegate);
131
132 // Spoof a libevent notification.
133 OnLibeventNotification(pump.get(), watcher);
134 }
135
136 class StopWatcher : public BaseWatcher {
137 public:
StopWatcher(MessagePumpLibevent::FdWatchController * controller)138 explicit StopWatcher(MessagePumpLibevent::FdWatchController* controller)
139 : BaseWatcher(controller) {}
140
141 ~StopWatcher() override = default;
142
OnFileCanWriteWithoutBlocking(int)143 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
144 controller_->StopWatchingFileDescriptor();
145 }
146 };
147
TEST_F(MessagePumpLibeventTest,StopWatcher)148 TEST_F(MessagePumpLibeventTest, StopWatcher) {
149 std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
150 MessagePumpLibevent::FdWatchController watcher(FROM_HERE);
151 StopWatcher delegate(&watcher);
152 pump->WatchFileDescriptor(pipefds_[1],
153 false, MessagePumpLibevent::WATCH_READ_WRITE, &watcher, &delegate);
154
155 // Spoof a libevent notification.
156 OnLibeventNotification(pump.get(), &watcher);
157 }
158
QuitMessageLoopAndStart(const Closure & quit_closure)159 void QuitMessageLoopAndStart(const Closure& quit_closure) {
160 quit_closure.Run();
161
162 RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
163 ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, runloop.QuitClosure());
164 runloop.Run();
165 }
166
167 class NestedPumpWatcher : public MessagePumpLibevent::FdWatcher {
168 public:
169 NestedPumpWatcher() = default;
170 ~NestedPumpWatcher() override = default;
171
OnFileCanReadWithoutBlocking(int)172 void OnFileCanReadWithoutBlocking(int /* fd */) override {
173 RunLoop runloop;
174 ThreadTaskRunnerHandle::Get()->PostTask(
175 FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
176 runloop.Run();
177 }
178
OnFileCanWriteWithoutBlocking(int)179 void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
180 };
181
TEST_F(MessagePumpLibeventTest,NestedPumpWatcher)182 TEST_F(MessagePumpLibeventTest, NestedPumpWatcher) {
183 std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
184 MessagePumpLibevent::FdWatchController watcher(FROM_HERE);
185 NestedPumpWatcher delegate;
186 pump->WatchFileDescriptor(pipefds_[1],
187 false, MessagePumpLibevent::WATCH_READ, &watcher, &delegate);
188
189 // Spoof a libevent notification.
190 OnLibeventNotification(pump.get(), &watcher);
191 }
192
FatalClosure()193 void FatalClosure() {
194 FAIL() << "Reached fatal closure.";
195 }
196
197 class QuitWatcher : public BaseWatcher {
198 public:
QuitWatcher(MessagePumpLibevent::FdWatchController * controller,base::Closure quit_closure)199 QuitWatcher(MessagePumpLibevent::FdWatchController* controller,
200 base::Closure quit_closure)
201 : BaseWatcher(controller), quit_closure_(std::move(quit_closure)) {}
202
OnFileCanReadWithoutBlocking(int)203 void OnFileCanReadWithoutBlocking(int /* fd */) override {
204 // Post a fatal closure to the MessageLoop before we quit it.
205 ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, BindOnce(&FatalClosure));
206
207 quit_closure_.Run();
208 }
209
210 private:
211 base::Closure quit_closure_;
212 };
213
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)214 void WriteFDWrapper(const int fd,
215 const char* buf,
216 int size,
217 WaitableEvent* event) {
218 ASSERT_TRUE(WriteFileDescriptor(fd, buf, size));
219 }
220
221 // Tests that MessagePumpLibevent quits immediately when it is quit from
222 // libevent's event_base_loop().
TEST_F(MessagePumpLibeventTest,QuitWatcher)223 TEST_F(MessagePumpLibeventTest, QuitWatcher) {
224 // Delete the old MessageLoop so that we can manage our own one here.
225 ui_loop_.reset();
226
227 MessagePumpLibevent* pump = new MessagePumpLibevent; // owned by |loop|.
228 MessageLoop loop(WrapUnique(pump));
229 RunLoop run_loop;
230 MessagePumpLibevent::FdWatchController controller(FROM_HERE);
231 QuitWatcher delegate(&controller, run_loop.QuitClosure());
232 WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
233 WaitableEvent::InitialState::NOT_SIGNALED);
234 std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
235
236 // Tell the pump to watch the pipe.
237 pump->WatchFileDescriptor(pipefds_[0], false, MessagePumpLibevent::WATCH_READ,
238 &controller, &delegate);
239
240 // Make the IO thread wait for |event| before writing to pipefds[1].
241 const char buf = 0;
242 WaitableEventWatcher::EventCallback write_fd_task =
243 BindOnce(&WriteFDWrapper, pipefds_[1], &buf, 1);
244 io_runner()->PostTask(
245 FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
246 Unretained(watcher.get()), &event,
247 std::move(write_fd_task), io_runner()));
248
249 // Queue |event| to signal on |loop|.
250 loop.task_runner()->PostTask(
251 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
252
253 // Now run the MessageLoop.
254 run_loop.Run();
255
256 // StartWatching can move |watcher| to IO thread. Release on IO thread.
257 io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
258 Owned(watcher.release())));
259 }
260
261 } // namespace
262
263 } // namespace base
264