1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fdevent.h"
18 
19 #include <gtest/gtest.h>
20 
21 #include <unistd.h>
22 #include <chrono>
23 #include <limits>
24 #include <memory>
25 #include <queue>
26 #include <string>
27 #include <thread>
28 #include <vector>
29 
30 #include <android-base/threads.h>
31 
32 #include "adb_io.h"
33 #include "fdevent_test.h"
34 
35 using namespace std::chrono_literals;
36 
37 class FdHandler {
38   public:
FdHandler(int read_fd,int write_fd,bool use_new_callback)39     FdHandler(int read_fd, int write_fd, bool use_new_callback)
40         : read_fd_(read_fd), write_fd_(write_fd) {
41         if (use_new_callback) {
42             read_fde_ = fdevent_create(read_fd_, FdEventNewCallback, this);
43             write_fde_ = fdevent_create(write_fd_, FdEventNewCallback, this);
44         } else {
45             read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
46             write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
47         }
48         fdevent_add(read_fde_, FDE_READ);
49     }
50 
~FdHandler()51     ~FdHandler() {
52         fdevent_destroy(read_fde_);
53         fdevent_destroy(write_fde_);
54     }
55 
56   private:
FdEventCallback(int fd,unsigned events,void * userdata)57     static void FdEventCallback(int fd, unsigned events, void* userdata) {
58         FdHandler* handler = reinterpret_cast<FdHandler*>(userdata);
59         ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events;
60         if (events & FDE_READ) {
61             ASSERT_EQ(fd, handler->read_fd_);
62             char c;
63             ASSERT_EQ(1, adb_read(fd, &c, 1));
64             handler->queue_.push(c);
65             fdevent_add(handler->write_fde_, FDE_WRITE);
66         }
67         if (events & FDE_WRITE) {
68             ASSERT_EQ(fd, handler->write_fd_);
69             ASSERT_FALSE(handler->queue_.empty());
70             char c = handler->queue_.front();
71             handler->queue_.pop();
72             ASSERT_EQ(1, adb_write(fd, &c, 1));
73             if (handler->queue_.empty()) {
74                 fdevent_del(handler->write_fde_, FDE_WRITE);
75             }
76         }
77     }
78 
FdEventNewCallback(fdevent * fde,unsigned events,void * userdata)79     static void FdEventNewCallback(fdevent* fde, unsigned events, void* userdata) {
80         int fd = fde->fd.get();
81         FdHandler* handler = reinterpret_cast<FdHandler*>(userdata);
82         ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events;
83         if (events & FDE_READ) {
84             ASSERT_EQ(fd, handler->read_fd_);
85             char c;
86             ASSERT_EQ(1, adb_read(fd, &c, 1));
87             handler->queue_.push(c);
88             fdevent_add(handler->write_fde_, FDE_WRITE);
89         }
90         if (events & FDE_WRITE) {
91             ASSERT_EQ(fd, handler->write_fd_);
92             ASSERT_FALSE(handler->queue_.empty());
93             char c = handler->queue_.front();
94             handler->queue_.pop();
95             ASSERT_EQ(1, adb_write(fd, &c, 1));
96             if (handler->queue_.empty()) {
97                 fdevent_del(handler->write_fde_, FDE_WRITE);
98             }
99         }
100     }
101 
102   private:
103     const int read_fd_;
104     const int write_fd_;
105     fdevent* read_fde_;
106     fdevent* write_fde_;
107     std::queue<char> queue_;
108 };
109 
110 struct ThreadArg {
111     int first_read_fd;
112     int last_write_fd;
113     size_t middle_pipe_count;
114 };
115 
TEST_F(FdeventTest,fdevent_terminate)116 TEST_F(FdeventTest, fdevent_terminate) {
117     PrepareThread();
118     TerminateThread();
119 }
120 
TEST_F(FdeventTest,smoke)121 TEST_F(FdeventTest, smoke) {
122 #ifdef __APPLE__  // on __APPLE__, we will encounter "Too many open files" (EMFILE), so
123     // tweak the resource ceiling.
124     struct rlimit limit;
125     ASSERT_EQ(getrlimit(RLIMIT_NOFILE, &limit), 0);
126 
127     limit.rlim_cur = OPEN_MAX;
128 
129     ASSERT_EQ(setrlimit(RLIMIT_NOFILE, &limit), 0);
130 #endif
131     for (bool use_new_callback : {true, false}) {
132         fdevent_reset();
133         const size_t PIPE_COUNT = 512;
134         const size_t MESSAGE_LOOP_COUNT = 10;
135         const std::string MESSAGE = "fdevent_test";
136         int fd_pair1[2];
137         int fd_pair2[2];
138         ASSERT_EQ(0, adb_socketpair(fd_pair1));
139         ASSERT_EQ(0, adb_socketpair(fd_pair2));
140         ThreadArg thread_arg;
141         thread_arg.first_read_fd = fd_pair1[0];
142         thread_arg.last_write_fd = fd_pair2[1];
143         thread_arg.middle_pipe_count = PIPE_COUNT;
144         int writer = fd_pair1[1];
145         int reader = fd_pair2[0];
146 
147         PrepareThread();
148 
149         std::vector<std::unique_ptr<FdHandler>> fd_handlers;
150         fdevent_run_on_looper([&thread_arg, &fd_handlers, use_new_callback]() {
151             std::vector<int> read_fds;
152             std::vector<int> write_fds;
153 
154             read_fds.push_back(thread_arg.first_read_fd);
155             for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
156                 int fds[2];
157                 ASSERT_EQ(0, adb_socketpair(fds));
158                 read_fds.push_back(fds[0]);
159                 write_fds.push_back(fds[1]);
160             }
161             write_fds.push_back(thread_arg.last_write_fd);
162 
163             for (size_t i = 0; i < read_fds.size(); ++i) {
164                 fd_handlers.push_back(
165                         std::make_unique<FdHandler>(read_fds[i], write_fds[i], use_new_callback));
166             }
167         });
168         WaitForFdeventLoop();
169 
170         for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
171             std::string read_buffer = MESSAGE;
172             std::string write_buffer(MESSAGE.size(), 'a');
173             ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
174             ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
175             ASSERT_EQ(read_buffer, write_buffer);
176         }
177 
178         fdevent_run_on_looper([&fd_handlers]() { fd_handlers.clear(); });
179         WaitForFdeventLoop();
180 
181         TerminateThread();
182         ASSERT_EQ(0, adb_close(writer));
183         ASSERT_EQ(0, adb_close(reader));
184     }
185 }
186 
TEST_F(FdeventTest,run_on_looper_thread_queued)187 TEST_F(FdeventTest, run_on_looper_thread_queued) {
188     std::vector<int> vec;
189 
190     PrepareThread();
191 
192     // Block the looper thread for a long time while we queue our callbacks.
193     fdevent_run_on_looper([]() {
194         fdevent_check_looper();
195         std::this_thread::sleep_for(std::chrono::seconds(1));
196     });
197 
198     for (int i = 0; i < 1000000; ++i) {
199         fdevent_run_on_looper([i, &vec]() {
200             fdevent_check_looper();
201             vec.push_back(i);
202         });
203     }
204 
205     TerminateThread();
206 
207     ASSERT_EQ(1000000u, vec.size());
208     for (int i = 0; i < 1000000; ++i) {
209         ASSERT_EQ(i, vec[i]);
210     }
211 }
212 
TEST_F(FdeventTest,run_on_looper_thread_reentrant)213 TEST_F(FdeventTest, run_on_looper_thread_reentrant) {
214     bool b = false;
215 
216     PrepareThread();
217 
218     fdevent_run_on_looper([&b]() {
219         fdevent_check_looper();
220         fdevent_run_on_looper([&b]() {
221             fdevent_check_looper();
222             b = true;
223         });
224     });
225 
226     TerminateThread();
227 
228     EXPECT_EQ(b, true);
229 }
230 
TEST_F(FdeventTest,timeout)231 TEST_F(FdeventTest, timeout) {
232     fdevent_reset();
233     PrepareThread();
234 
235     enum class TimeoutEvent {
236         read,
237         timeout,
238         done,
239     };
240 
241     struct TimeoutTest {
242         std::vector<std::pair<TimeoutEvent, std::chrono::steady_clock::time_point>> events;
243         fdevent* fde;
244     };
245     TimeoutTest test;
246 
247     int fds[2];
248     ASSERT_EQ(0, adb_socketpair(fds));
249     static constexpr auto delta = 100ms;
250     fdevent_run_on_looper([&]() {
251         test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) {
252             auto test = static_cast<TimeoutTest*>(arg);
253             auto now = std::chrono::steady_clock::now();
254             CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT));
255             TimeoutEvent event;
256             if ((events & FDE_READ)) {
257                 char buf[2];
258                 ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf));
259                 if (rc == 0) {
260                     event = TimeoutEvent::done;
261                 } else if (rc == 1) {
262                     event = TimeoutEvent::read;
263                 } else {
264                     abort();
265                 }
266             } else if ((events & FDE_TIMEOUT)) {
267                 event = TimeoutEvent::timeout;
268             } else {
269                 abort();
270             }
271 
272             CHECK_EQ(fde, test->fde);
273             test->events.emplace_back(event, now);
274 
275             if (event == TimeoutEvent::done) {
276                 fdevent_destroy(fde);
277             }
278         }, &test);
279         fdevent_add(test.fde, FDE_READ);
280         fdevent_set_timeout(test.fde, delta);
281     });
282 
283     ASSERT_EQ(1, adb_write(fds[1], "", 1));
284 
285     // Timeout should happen here
286     std::this_thread::sleep_for(delta);
287 
288     // and another.
289     std::this_thread::sleep_for(delta);
290 
291     // No timeout should happen here.
292     std::this_thread::sleep_for(delta / 2);
293     adb_close(fds[1]);
294 
295     TerminateThread();
296 
297     ASSERT_EQ(4ULL, test.events.size());
298     ASSERT_EQ(TimeoutEvent::read, test.events[0].first);
299     ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first);
300     ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first);
301     ASSERT_EQ(TimeoutEvent::done, test.events[3].first);
302 
303     std::vector<int> time_deltas;
304     for (size_t i = 0; i < test.events.size() - 1; ++i) {
305         auto before = test.events[i].second;
306         auto after = test.events[i + 1].second;
307         auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(after - before);
308         time_deltas.push_back(diff.count());
309     }
310 
311     std::vector<int> expected = {
312         delta.count(),
313         delta.count(),
314         delta.count() / 2,
315     };
316 
317     std::vector<int> diff;
318     ASSERT_EQ(time_deltas.size(), expected.size());
319     for (size_t i = 0; i < time_deltas.size(); ++i) {
320         diff.push_back(std::abs(time_deltas[i] - expected[i]));
321     }
322 
323     ASSERT_LT(diff[0], delta.count() * 0.5);
324     ASSERT_LT(diff[1], delta.count() * 0.5);
325     ASSERT_LT(diff[2], delta.count() * 0.5);
326 }
327 
TEST_F(FdeventTest,unregister_with_pending_event)328 TEST_F(FdeventTest, unregister_with_pending_event) {  // Remains broken on _WIN32
329     // since poll() (Loop()/fdevent_poll.cpp) fails with `Invalid areg` causing
330     // a hang on Windows 10.
331     // Ref: [ FAILED ] LocalSocketTest.flush_after_shutdown
332 
333     fdevent_reset();
334 
335     int fds1[2];
336     int fds2[2];
337     ASSERT_EQ(0, adb_socketpair(fds1));
338     ASSERT_EQ(0, adb_socketpair(fds2));
339 
340     struct Test {
341         fdevent* fde1;
342         fdevent* fde2;
343         bool should_not_happen;
344     };
345     Test test{};
346 
347     test.fde1 = fdevent_create(
348             fds1[0],
349             [](fdevent* fde, unsigned events, void* arg) {
350                 auto test = static_cast<Test*>(arg);
351                 // Unregister fde2 from inside the fde1 event
352                 fdevent_destroy(test->fde2);
353                 // Unregister fde1 so it doesn't get called again
354                 fdevent_destroy(test->fde1);
355             },
356             &test);
357 
358     test.fde2 = fdevent_create(
359             fds2[0],
360             [](fdevent* fde, unsigned events, void* arg) {
361                 auto test = static_cast<Test*>(arg);
362                 test->should_not_happen = true;
363             },
364             &test);
365 
366     fdevent_add(test.fde1, FDE_READ | FDE_ERROR);
367     fdevent_add(test.fde2, FDE_READ | FDE_ERROR);
368 
369     PrepareThread();
370     WaitForFdeventLoop();
371 
372     std::mutex m;
373     std::condition_variable cv;
374     bool main_thread_latch = false;
375     bool looper_thread_latch = false;
376 
377     fdevent_run_on_looper([&]() {
378         std::unique_lock lk(m);
379         // Notify the main thread that the looper is in this lambda
380         main_thread_latch = true;
381         cv.notify_one();
382         // Pause the looper to ensure both events occur in the same epoll_wait
383         cv.wait(lk, [&] { return looper_thread_latch; });
384     });
385 
386     // Wait for the looper thread to pause to ensure it is not in epoll_wait
387     {
388         std::unique_lock lk(m);
389         cv.wait(lk, [&] { return main_thread_latch; });
390     }
391 
392     // Write to one end of the sockets to trigger events on the other ends
393     adb_write(fds1[1], "a", 1);
394     adb_write(fds2[1], "a", 1);
395 
396     // Unpause the looper thread to let it loop back into epoll_wait, which should return
397     // both fde1 and fde2.
398     {
399         std::lock_guard lk(m);
400         looper_thread_latch = true;
401     }
402     cv.notify_one();
403 
404     WaitForFdeventLoop();
405     TerminateThread();
406 
407     adb_close(fds1[0]);
408     adb_close(fds1[1]);
409     adb_close(fds2[0]);
410     adb_close(fds2[1]);
411 
412     ASSERT_FALSE(test.should_not_happen);
413 }
414