1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <unistd.h>
8 
9 #include <memory>
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/bind_helpers.h"
14 #include "base/files/file_util.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/run_loop.h"
19 #include "base/single_thread_task_runner.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/synchronization/waitable_event_watcher.h"
22 #include "base/test/gtest_util.h"
23 #include "base/third_party/libevent/event.h"
24 #include "base/threading/sequenced_task_runner_handle.h"
25 #include "base/threading/thread.h"
26 #include "base/threading/thread_task_runner_handle.h"
27 #include "build/build_config.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29 
30 namespace base {
31 
32 class MessagePumpLibeventTest : public testing::Test {
33  protected:
MessagePumpLibeventTest()34   MessagePumpLibeventTest()
35       : ui_loop_(new MessageLoop(MessageLoop::TYPE_UI)),
36         io_thread_("MessagePumpLibeventTestIOThread") {}
37   ~MessagePumpLibeventTest() override = default;
38 
SetUp()39   void SetUp() override {
40     Thread::Options options(MessageLoop::TYPE_IO, 0);
41     ASSERT_TRUE(io_thread_.StartWithOptions(options));
42     ASSERT_EQ(MessageLoop::TYPE_IO, io_thread_.message_loop()->type());
43     int ret = pipe(pipefds_);
44     ASSERT_EQ(0, ret);
45   }
46 
TearDown()47   void TearDown() override {
48     if (IGNORE_EINTR(close(pipefds_[0])) < 0)
49       PLOG(ERROR) << "close";
50     if (IGNORE_EINTR(close(pipefds_[1])) < 0)
51       PLOG(ERROR) << "close";
52   }
53 
WaitUntilIoThreadStarted()54   void WaitUntilIoThreadStarted() {
55     ASSERT_TRUE(io_thread_.WaitUntilThreadStarted());
56   }
57 
io_runner() const58   scoped_refptr<SingleThreadTaskRunner> io_runner() const {
59     return io_thread_.task_runner();
60   }
61 
OnLibeventNotification(MessagePumpLibevent * pump,MessagePumpLibevent::FdWatchController * controller)62   void OnLibeventNotification(
63       MessagePumpLibevent* pump,
64       MessagePumpLibevent::FdWatchController* controller) {
65     pump->OnLibeventNotification(0, EV_WRITE | EV_READ, controller);
66   }
67 
68   int pipefds_[2];
69   std::unique_ptr<MessageLoop> ui_loop_;
70 
71  private:
72   Thread io_thread_;
73 };
74 
75 namespace {
76 
77 // Concrete implementation of MessagePumpLibevent::FdWatcher that does
78 // nothing useful.
79 class StupidWatcher : public MessagePumpLibevent::FdWatcher {
80  public:
81   ~StupidWatcher() override = default;
82 
83   // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int fd)84   void OnFileCanReadWithoutBlocking(int fd) override {}
OnFileCanWriteWithoutBlocking(int fd)85   void OnFileCanWriteWithoutBlocking(int fd) override {}
86 };
87 
TEST_F(MessagePumpLibeventTest,QuitOutsideOfRun)88 TEST_F(MessagePumpLibeventTest, QuitOutsideOfRun) {
89   std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
90   ASSERT_DCHECK_DEATH(pump->Quit());
91 }
92 
93 class BaseWatcher : public MessagePumpLibevent::FdWatcher {
94  public:
BaseWatcher(MessagePumpLibevent::FdWatchController * controller)95   explicit BaseWatcher(MessagePumpLibevent::FdWatchController* controller)
96       : controller_(controller) {
97     DCHECK(controller_);
98   }
99   ~BaseWatcher() override = default;
100 
101   // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int)102   void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
103 
OnFileCanWriteWithoutBlocking(int)104   void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
105 
106  protected:
107   MessagePumpLibevent::FdWatchController* controller_;
108 };
109 
110 class DeleteWatcher : public BaseWatcher {
111  public:
DeleteWatcher(MessagePumpLibevent::FdWatchController * controller)112   explicit DeleteWatcher(MessagePumpLibevent::FdWatchController* controller)
113       : BaseWatcher(controller) {}
114 
~DeleteWatcher()115   ~DeleteWatcher() override { DCHECK(!controller_); }
116 
OnFileCanWriteWithoutBlocking(int)117   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
118     DCHECK(controller_);
119     delete controller_;
120     controller_ = nullptr;
121   }
122 };
123 
TEST_F(MessagePumpLibeventTest,DeleteWatcher)124 TEST_F(MessagePumpLibeventTest, DeleteWatcher) {
125   std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
126   MessagePumpLibevent::FdWatchController* watcher =
127       new MessagePumpLibevent::FdWatchController(FROM_HERE);
128   DeleteWatcher delegate(watcher);
129   pump->WatchFileDescriptor(pipefds_[1],
130       false, MessagePumpLibevent::WATCH_READ_WRITE, watcher, &delegate);
131 
132   // Spoof a libevent notification.
133   OnLibeventNotification(pump.get(), watcher);
134 }
135 
136 class StopWatcher : public BaseWatcher {
137  public:
StopWatcher(MessagePumpLibevent::FdWatchController * controller)138   explicit StopWatcher(MessagePumpLibevent::FdWatchController* controller)
139       : BaseWatcher(controller) {}
140 
141   ~StopWatcher() override = default;
142 
OnFileCanWriteWithoutBlocking(int)143   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
144     controller_->StopWatchingFileDescriptor();
145   }
146 };
147 
TEST_F(MessagePumpLibeventTest,StopWatcher)148 TEST_F(MessagePumpLibeventTest, StopWatcher) {
149   std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
150   MessagePumpLibevent::FdWatchController watcher(FROM_HERE);
151   StopWatcher delegate(&watcher);
152   pump->WatchFileDescriptor(pipefds_[1],
153       false, MessagePumpLibevent::WATCH_READ_WRITE, &watcher, &delegate);
154 
155   // Spoof a libevent notification.
156   OnLibeventNotification(pump.get(), &watcher);
157 }
158 
QuitMessageLoopAndStart(const Closure & quit_closure)159 void QuitMessageLoopAndStart(const Closure& quit_closure) {
160   quit_closure.Run();
161 
162   RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
163   ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, runloop.QuitClosure());
164   runloop.Run();
165 }
166 
167 class NestedPumpWatcher : public MessagePumpLibevent::FdWatcher {
168  public:
169   NestedPumpWatcher() = default;
170   ~NestedPumpWatcher() override = default;
171 
OnFileCanReadWithoutBlocking(int)172   void OnFileCanReadWithoutBlocking(int /* fd */) override {
173     RunLoop runloop;
174     ThreadTaskRunnerHandle::Get()->PostTask(
175         FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
176     runloop.Run();
177   }
178 
OnFileCanWriteWithoutBlocking(int)179   void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
180 };
181 
TEST_F(MessagePumpLibeventTest,NestedPumpWatcher)182 TEST_F(MessagePumpLibeventTest, NestedPumpWatcher) {
183   std::unique_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
184   MessagePumpLibevent::FdWatchController watcher(FROM_HERE);
185   NestedPumpWatcher delegate;
186   pump->WatchFileDescriptor(pipefds_[1],
187       false, MessagePumpLibevent::WATCH_READ, &watcher, &delegate);
188 
189   // Spoof a libevent notification.
190   OnLibeventNotification(pump.get(), &watcher);
191 }
192 
FatalClosure()193 void FatalClosure() {
194   FAIL() << "Reached fatal closure.";
195 }
196 
197 class QuitWatcher : public BaseWatcher {
198  public:
QuitWatcher(MessagePumpLibevent::FdWatchController * controller,base::Closure quit_closure)199   QuitWatcher(MessagePumpLibevent::FdWatchController* controller,
200               base::Closure quit_closure)
201       : BaseWatcher(controller), quit_closure_(std::move(quit_closure)) {}
202 
OnFileCanReadWithoutBlocking(int)203   void OnFileCanReadWithoutBlocking(int /* fd */) override {
204     // Post a fatal closure to the MessageLoop before we quit it.
205     ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, BindOnce(&FatalClosure));
206 
207     quit_closure_.Run();
208   }
209 
210  private:
211   base::Closure quit_closure_;
212 };
213 
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)214 void WriteFDWrapper(const int fd,
215                     const char* buf,
216                     int size,
217                     WaitableEvent* event) {
218   ASSERT_TRUE(WriteFileDescriptor(fd, buf, size));
219 }
220 
221 // Tests that MessagePumpLibevent quits immediately when it is quit from
222 // libevent's event_base_loop().
TEST_F(MessagePumpLibeventTest,QuitWatcher)223 TEST_F(MessagePumpLibeventTest, QuitWatcher) {
224   // Delete the old MessageLoop so that we can manage our own one here.
225   ui_loop_.reset();
226 
227   MessagePumpLibevent* pump = new MessagePumpLibevent;  // owned by |loop|.
228   MessageLoop loop(WrapUnique(pump));
229   RunLoop run_loop;
230   MessagePumpLibevent::FdWatchController controller(FROM_HERE);
231   QuitWatcher delegate(&controller, run_loop.QuitClosure());
232   WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
233                       WaitableEvent::InitialState::NOT_SIGNALED);
234   std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
235 
236   // Tell the pump to watch the pipe.
237   pump->WatchFileDescriptor(pipefds_[0], false, MessagePumpLibevent::WATCH_READ,
238                             &controller, &delegate);
239 
240   // Make the IO thread wait for |event| before writing to pipefds[1].
241   const char buf = 0;
242   WaitableEventWatcher::EventCallback write_fd_task =
243       BindOnce(&WriteFDWrapper, pipefds_[1], &buf, 1);
244   io_runner()->PostTask(
245       FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
246                           Unretained(watcher.get()), &event,
247                           std::move(write_fd_task), io_runner()));
248 
249   // Queue |event| to signal on |loop|.
250   loop.task_runner()->PostTask(
251       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
252 
253   // Now run the MessageLoop.
254   run_loop.Run();
255 
256   // StartWatching can move |watcher| to IO thread. Release on IO thread.
257   io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
258                                             Owned(watcher.release())));
259 }
260 
261 }  // namespace
262 
263 }  // namespace base
264