1 // Copyright 2017 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::{Cell, Ref, RefCell};
6 use std::cmp::min;
7 use std::fs::File;
8 use std::i32;
9 use std::i64;
10 use std::marker::PhantomData;
11 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
12 use std::ptr::null_mut;
13 use std::slice;
14 use std::thread;
15 use std::time::Duration;
16 
17 use libc::{
18     c_int, epoll_create1, epoll_ctl, epoll_event, epoll_wait, EPOLLHUP, EPOLLIN, EPOLLOUT,
19     EPOLL_CLOEXEC, EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD,
20 };
21 
22 use crate::{errno_result, Result};
23 
24 const POLL_CONTEXT_MAX_EVENTS: usize = 16;
25 
26 /// EpollEvents wraps raw epoll_events, it should only be used with EpollContext.
27 pub struct EpollEvents(RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>);
28 
29 impl EpollEvents {
new() -> EpollEvents30     pub fn new() -> EpollEvents {
31         EpollEvents(RefCell::new(
32             [epoll_event { events: 0, u64: 0 }; POLL_CONTEXT_MAX_EVENTS],
33         ))
34     }
35 }
36 
37 impl Default for EpollEvents {
default() -> EpollEvents38     fn default() -> EpollEvents {
39         Self::new()
40     }
41 }
42 
43 /// Trait for a token that can be associated with an `fd` in a `PollContext`.
44 ///
45 /// Simple enums that have no or primitive variant data data can use the `#[derive(PollToken)]`
46 /// custom derive to implement this trait. See
47 /// [poll_token_derive::poll_token](../poll_token_derive/fn.poll_token.html) for details.
48 pub trait PollToken {
49     /// Converts this token into a u64 that can be turned back into a token via `from_raw_token`.
as_raw_token(&self) -> u6450     fn as_raw_token(&self) -> u64;
51 
52     /// Converts a raw token as returned from `as_raw_token` back into a token.
53     ///
54     /// It is invalid to give a raw token that was not returned via `as_raw_token` from the same
55     /// `Self`. The implementation can expect that this will never happen as a result of its usage
56     /// in `PollContext`.
from_raw_token(data: u64) -> Self57     fn from_raw_token(data: u64) -> Self;
58 }
59 
60 impl PollToken for usize {
as_raw_token(&self) -> u6461     fn as_raw_token(&self) -> u64 {
62         *self as u64
63     }
64 
from_raw_token(data: u64) -> Self65     fn from_raw_token(data: u64) -> Self {
66         data as Self
67     }
68 }
69 
70 impl PollToken for u64 {
as_raw_token(&self) -> u6471     fn as_raw_token(&self) -> u64 {
72         *self as u64
73     }
74 
from_raw_token(data: u64) -> Self75     fn from_raw_token(data: u64) -> Self {
76         data as Self
77     }
78 }
79 
80 impl PollToken for u32 {
as_raw_token(&self) -> u6481     fn as_raw_token(&self) -> u64 {
82         u64::from(*self)
83     }
84 
from_raw_token(data: u64) -> Self85     fn from_raw_token(data: u64) -> Self {
86         data as Self
87     }
88 }
89 
90 impl PollToken for u16 {
as_raw_token(&self) -> u6491     fn as_raw_token(&self) -> u64 {
92         u64::from(*self)
93     }
94 
from_raw_token(data: u64) -> Self95     fn from_raw_token(data: u64) -> Self {
96         data as Self
97     }
98 }
99 
100 impl PollToken for u8 {
as_raw_token(&self) -> u64101     fn as_raw_token(&self) -> u64 {
102         u64::from(*self)
103     }
104 
from_raw_token(data: u64) -> Self105     fn from_raw_token(data: u64) -> Self {
106         data as Self
107     }
108 }
109 
110 impl PollToken for () {
as_raw_token(&self) -> u64111     fn as_raw_token(&self) -> u64 {
112         0
113     }
114 
from_raw_token(_data: u64) -> Self115     fn from_raw_token(_data: u64) -> Self {}
116 }
117 
118 /// An event returned by `PollContext::wait`.
119 pub struct PollEvent<'a, T> {
120     event: &'a epoll_event,
121     token: PhantomData<T>, // Needed to satisfy usage of T
122 }
123 
124 impl<'a, T: PollToken> PollEvent<'a, T> {
125     /// Gets the token associated in `PollContext::add` with this event.
token(&self) -> T126     pub fn token(&self) -> T {
127         T::from_raw_token(self.event.u64)
128     }
129 
130     /// True if the `fd` associated with this token in `PollContext::add` is readable.
readable(&self) -> bool131     pub fn readable(&self) -> bool {
132         self.event.events & (EPOLLIN as u32) != 0
133     }
134 
135     /// True if the `fd` associated with this token in `PollContext::add` is writable.
writable(&self) -> bool136     pub fn writable(&self) -> bool {
137         self.event.events & (EPOLLOUT as u32) != 0
138     }
139 
140     /// True if the `fd` associated with this token in `PollContext::add` has been hungup on.
hungup(&self) -> bool141     pub fn hungup(&self) -> bool {
142         self.event.events & (EPOLLHUP as u32) != 0
143     }
144 }
145 
146 /// An iterator over some (sub)set of events returned by `PollContext::wait`.
147 pub struct PollEventIter<'a, I, T>
148 where
149     I: Iterator<Item = &'a epoll_event>,
150 {
151     mask: u32,
152     iter: I,
153     tokens: PhantomData<[T]>, // Needed to satisfy usage of T
154 }
155 
156 impl<'a, I, T> Iterator for PollEventIter<'a, I, T>
157 where
158     I: Iterator<Item = &'a epoll_event>,
159     T: PollToken,
160 {
161     type Item = PollEvent<'a, T>;
next(&mut self) -> Option<Self::Item>162     fn next(&mut self) -> Option<Self::Item> {
163         let mask = self.mask;
164         self.iter
165             .find(|event| (event.events & mask) != 0)
166             .map(|event| PollEvent {
167                 event,
168                 token: PhantomData,
169             })
170     }
171 }
172 
173 /// The list of event returned by `PollContext::wait`.
174 pub struct PollEvents<'a, T> {
175     count: usize,
176     events: Ref<'a, [epoll_event; POLL_CONTEXT_MAX_EVENTS]>,
177     tokens: PhantomData<[T]>, // Needed to satisfy usage of T
178 }
179 
180 impl<'a, T: PollToken> PollEvents<'a, T> {
181     /// Copies the events to an owned structure so the reference to this (and by extension
182     /// `PollContext`) can be dropped.
to_owned(&self) -> PollEventsOwned<T>183     pub fn to_owned(&self) -> PollEventsOwned<T> {
184         PollEventsOwned {
185             count: self.count,
186             events: RefCell::new(*self.events),
187             tokens: PhantomData,
188         }
189     }
190 
191     /// Iterates over each event.
iter(&self) -> PollEventIter<slice::Iter<epoll_event>, T>192     pub fn iter(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
193         PollEventIter {
194             mask: 0xffff_ffff,
195             iter: self.events[..self.count].iter(),
196             tokens: PhantomData,
197         }
198     }
199 
200     /// Iterates over each readable event.
iter_readable(&self) -> PollEventIter<slice::Iter<epoll_event>, T>201     pub fn iter_readable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
202         PollEventIter {
203             mask: EPOLLIN as u32,
204             iter: self.events[..self.count].iter(),
205             tokens: PhantomData,
206         }
207     }
208 
209     /// Iterates over each writable event.
iter_writable(&self) -> PollEventIter<slice::Iter<epoll_event>, T>210     pub fn iter_writable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
211         PollEventIter {
212             mask: EPOLLOUT as u32,
213             iter: self.events[..self.count].iter(),
214             tokens: PhantomData,
215         }
216     }
217 
218     /// Iterates over each hungup event.
iter_hungup(&self) -> PollEventIter<slice::Iter<epoll_event>, T>219     pub fn iter_hungup(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
220         PollEventIter {
221             mask: EPOLLHUP as u32,
222             iter: self.events[..self.count].iter(),
223             tokens: PhantomData,
224         }
225     }
226 }
227 
228 impl<'a, T: PollToken> IntoIterator for &'a PollEvents<'_, T> {
229     type Item = PollEvent<'a, T>;
230     type IntoIter = PollEventIter<'a, slice::Iter<'a, epoll_event>, T>;
231 
into_iter(self) -> Self::IntoIter232     fn into_iter(self) -> Self::IntoIter {
233         self.iter()
234     }
235 }
236 
237 /// A deep copy of the event records from `PollEvents`.
238 pub struct PollEventsOwned<T> {
239     count: usize,
240     events: RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>,
241     tokens: PhantomData<T>, // Needed to satisfy usage of T
242 }
243 
244 impl<T: PollToken> PollEventsOwned<T> {
245     /// Takes a reference to the events so that they can be iterated via methods in `PollEvents`.
as_ref(&self) -> PollEvents<T>246     pub fn as_ref(&self) -> PollEvents<T> {
247         PollEvents {
248             count: self.count,
249             events: self.events.borrow(),
250             tokens: PhantomData,
251         }
252     }
253 }
254 
255 /// Watching events taken by PollContext.
256 pub struct WatchingEvents(u32);
257 
258 impl WatchingEvents {
259     /// Returns empty Events.
260     #[inline(always)]
empty() -> WatchingEvents261     pub fn empty() -> WatchingEvents {
262         WatchingEvents(0)
263     }
264 
265     /// Build Events from raw epoll events (defined in epoll_ctl(2)).
266     #[inline(always)]
new(raw: u32) -> WatchingEvents267     pub fn new(raw: u32) -> WatchingEvents {
268         WatchingEvents(raw)
269     }
270 
271     /// Set read events.
272     #[inline(always)]
set_read(self) -> WatchingEvents273     pub fn set_read(self) -> WatchingEvents {
274         WatchingEvents(self.0 | EPOLLIN as u32)
275     }
276 
277     /// Set write events.
278     #[inline(always)]
set_write(self) -> WatchingEvents279     pub fn set_write(self) -> WatchingEvents {
280         WatchingEvents(self.0 | EPOLLOUT as u32)
281     }
282 
283     /// Get the underlying epoll events.
get_raw(&self) -> u32284     pub fn get_raw(&self) -> u32 {
285         self.0
286     }
287 }
288 
289 /// EpollContext wraps linux epoll. It provides similar interface to PollContext.
290 /// It is thread safe while PollContext is not. It requires user to pass in a reference of
291 /// EpollEvents while PollContext does not. Always use PollContext if you don't need to access the
292 /// same epoll from different threads.
293 pub struct EpollContext<T> {
294     epoll_ctx: File,
295     // Needed to satisfy usage of T
296     tokens: PhantomData<[T]>,
297 }
298 
299 impl<T: PollToken> EpollContext<T> {
300     /// Creates a new `EpollContext`.
new() -> Result<EpollContext<T>>301     pub fn new() -> Result<EpollContext<T>> {
302         // Safe because we check the return value.
303         let epoll_fd = unsafe { epoll_create1(EPOLL_CLOEXEC) };
304         if epoll_fd < 0 {
305             return errno_result();
306         }
307         Ok(EpollContext {
308             epoll_ctx: unsafe { File::from_raw_fd(epoll_fd) },
309             tokens: PhantomData,
310         })
311     }
312 
313     /// Creates a new `EpollContext` and adds the slice of `fd` and `token` tuples to the new
314     /// context.
315     ///
316     /// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will
317     /// return the error instead of the new context.
build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<EpollContext<T>>318     pub fn build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<EpollContext<T>> {
319         let ctx = EpollContext::new()?;
320         ctx.add_many(fd_tokens)?;
321         Ok(ctx)
322     }
323 
324     /// Adds the given slice of `fd` and `token` tuples to this context.
325     ///
326     /// This is equivalent to calling `add` with each `fd` and `token`. If there are any errors,
327     /// this method will stop adding `fd`s and return the first error, leaving this context in a
328     /// undefined state.
add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()>329     pub fn add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()> {
330         for (fd, token) in fd_tokens {
331             self.add(*fd, T::from_raw_token(token.as_raw_token()))?;
332         }
333         Ok(())
334     }
335 
336     /// Adds the given `fd` to this context and associates the given `token` with the `fd`'s
337     /// readable events.
338     ///
339     /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
340     /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
341     /// FD number) added to this context, events will not be reported by `wait` anymore.
add(&self, fd: &dyn AsRawFd, token: T) -> Result<()>342     pub fn add(&self, fd: &dyn AsRawFd, token: T) -> Result<()> {
343         self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token)
344     }
345 
346     /// Adds the given `fd` to this context, watching for the specified events and associates the
347     /// given 'token' with those events.
348     ///
349     /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
350     /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
351     /// FD number) added to this context, events will not be reported by `wait` anymore.
add_fd_with_events( &self, fd: &dyn AsRawFd, events: WatchingEvents, token: T, ) -> Result<()>352     pub fn add_fd_with_events(
353         &self,
354         fd: &dyn AsRawFd,
355         events: WatchingEvents,
356         token: T,
357     ) -> Result<()> {
358         let mut evt = epoll_event {
359             events: events.get_raw(),
360             u64: token.as_raw_token(),
361         };
362         // Safe because we give a valid epoll FD and FD to watch, as well as a valid epoll_event
363         // structure. Then we check the return value.
364         let ret = unsafe {
365             epoll_ctl(
366                 self.epoll_ctx.as_raw_fd(),
367                 EPOLL_CTL_ADD,
368                 fd.as_raw_fd(),
369                 &mut evt,
370             )
371         };
372         if ret < 0 {
373             return errno_result();
374         };
375         Ok(())
376     }
377 
378     /// If `fd` was previously added to this context, the watched events will be replaced with
379     /// `events` and the token associated with it will be replaced with the given `token`.
modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()>380     pub fn modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()> {
381         let mut evt = epoll_event {
382             events: events.0,
383             u64: token.as_raw_token(),
384         };
385         // Safe because we give a valid epoll FD and FD to modify, as well as a valid epoll_event
386         // structure. Then we check the return value.
387         let ret = unsafe {
388             epoll_ctl(
389                 self.epoll_ctx.as_raw_fd(),
390                 EPOLL_CTL_MOD,
391                 fd.as_raw_fd(),
392                 &mut evt,
393             )
394         };
395         if ret < 0 {
396             return errno_result();
397         };
398         Ok(())
399     }
400 
401     /// Deletes the given `fd` from this context.
402     ///
403     /// If an `fd`'s token shows up in the list of hangup events, it should be removed using this
404     /// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`.
405     /// Failure to do so will cause the `wait` method to always return immediately, causing ~100%
406     /// CPU load.
delete(&self, fd: &dyn AsRawFd) -> Result<()>407     pub fn delete(&self, fd: &dyn AsRawFd) -> Result<()> {
408         // Safe because we give a valid epoll FD and FD to stop watching. Then we check the return
409         // value.
410         let ret = unsafe {
411             epoll_ctl(
412                 self.epoll_ctx.as_raw_fd(),
413                 EPOLL_CTL_DEL,
414                 fd.as_raw_fd(),
415                 null_mut(),
416             )
417         };
418         if ret < 0 {
419             return errno_result();
420         };
421         Ok(())
422     }
423 
424     /// Waits for any events to occur in FDs that were previously added to this context.
425     ///
426     /// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading
427     /// for readable events and not closing for hungup events), subsequent calls to `wait` will
428     /// return immediately. The consequence of not handling an event perpetually while calling
429     /// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to
430     /// ~100% usage.
wait<'a>(&self, events: &'a EpollEvents) -> Result<PollEvents<'a, T>>431     pub fn wait<'a>(&self, events: &'a EpollEvents) -> Result<PollEvents<'a, T>> {
432         self.wait_timeout(events, Duration::new(i64::MAX as u64, 0))
433     }
434 
435     /// Like `wait` except will only block for a maximum of the given `timeout`.
436     ///
437     /// This may return earlier than `timeout` with zero events if the duration indicated exceeds
438     /// system limits.
wait_timeout<'a>( &self, events: &'a EpollEvents, timeout: Duration, ) -> Result<PollEvents<'a, T>>439     pub fn wait_timeout<'a>(
440         &self,
441         events: &'a EpollEvents,
442         timeout: Duration,
443     ) -> Result<PollEvents<'a, T>> {
444         let timeout_millis = if timeout.as_secs() as i64 == i64::max_value() {
445             // We make the convenient assumption that 2^63 seconds is an effectively unbounded time
446             // frame. This is meant to mesh with `wait` calling us with no timeout.
447             -1
448         } else {
449             // In cases where we the number of milliseconds would overflow an i32, we substitute the
450             // maximum timeout which is ~24.8 days.
451             let millis = timeout
452                 .as_secs()
453                 .checked_mul(1_000)
454                 .and_then(|ms| ms.checked_add(u64::from(timeout.subsec_nanos()) / 1_000_000))
455                 .unwrap_or(i32::max_value() as u64);
456             min(i32::max_value() as u64, millis) as i32
457         };
458         let ret = {
459             let mut epoll_events = events.0.borrow_mut();
460             let max_events = epoll_events.len() as c_int;
461             // Safe because we give an epoll context and a properly sized epoll_events array
462             // pointer, which we trust the kernel to fill in properly.
463             unsafe {
464                 handle_eintr_errno!(epoll_wait(
465                     self.epoll_ctx.as_raw_fd(),
466                     &mut epoll_events[0],
467                     max_events,
468                     timeout_millis
469                 ))
470             }
471         };
472         if ret < 0 {
473             return errno_result();
474         }
475         let epoll_events = events.0.borrow();
476         let events = PollEvents {
477             count: ret as usize,
478             events: epoll_events,
479             tokens: PhantomData,
480         };
481         Ok(events)
482     }
483 }
484 
485 impl<T: PollToken> AsRawFd for EpollContext<T> {
as_raw_fd(&self) -> RawFd486     fn as_raw_fd(&self) -> RawFd {
487         self.epoll_ctx.as_raw_fd()
488     }
489 }
490 
491 impl<T: PollToken> IntoRawFd for EpollContext<T> {
into_raw_fd(self) -> RawFd492     fn into_raw_fd(self) -> RawFd {
493         self.epoll_ctx.into_raw_fd()
494     }
495 }
496 
497 /// Used to poll multiple objects that have file descriptors.
498 ///
499 /// # Example
500 ///
501 /// ```
502 /// # use sys_util::{Result, EventFd, PollContext, PollEvents};
503 /// # fn test() -> Result<()> {
504 ///     let evt1 = EventFd::new()?;
505 ///     let evt2 = EventFd::new()?;
506 ///     evt2.write(1)?;
507 ///
508 ///     let ctx: PollContext<u32> = PollContext::new()?;
509 ///     ctx.add(&evt1, 1)?;
510 ///     ctx.add(&evt2, 2)?;
511 ///
512 ///     let pollevents: PollEvents<u32> = ctx.wait()?;
513 ///     let tokens: Vec<u32> = pollevents.iter_readable().map(|e| e.token()).collect();
514 ///     assert_eq!(&tokens[..], &[2]);
515 /// #   Ok(())
516 /// # }
517 /// ```
518 pub struct PollContext<T> {
519     epoll_ctx: EpollContext<T>,
520 
521     // We use a RefCell here so that the `wait` method only requires an immutable self reference
522     // while returning the events (encapsulated by PollEvents). Without the RefCell, `wait` would
523     // hold a mutable reference that lives as long as its returned reference (i.e. the PollEvents),
524     // even though that reference is immutable. This is terribly inconvenient for the caller because
525     // the borrow checking would prevent them from using `delete` and `add` while the events are in
526     // scope.
527     events: EpollEvents,
528 
529     // Hangup busy loop detection variables. See `check_for_hungup_busy_loop`.
530     hangups: Cell<usize>,
531     max_hangups: Cell<usize>,
532 }
533 
534 impl<T: PollToken> PollContext<T> {
535     /// Creates a new `PollContext`.
new() -> Result<PollContext<T>>536     pub fn new() -> Result<PollContext<T>> {
537         Ok(PollContext {
538             epoll_ctx: EpollContext::new()?,
539             events: EpollEvents::new(),
540             hangups: Cell::new(0),
541             max_hangups: Cell::new(0),
542         })
543     }
544 
545     /// Creates a new `PollContext` and adds the slice of `fd` and `token` tuples to the new
546     /// context.
547     ///
548     /// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will
549     /// return the error instead of the new context.
build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<PollContext<T>>550     pub fn build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<PollContext<T>> {
551         let ctx = PollContext::new()?;
552         ctx.add_many(fd_tokens)?;
553         Ok(ctx)
554     }
555 
556     /// Adds the given slice of `fd` and `token` tuples to this context.
557     ///
558     /// This is equivalent to calling `add` with each `fd` and `token`. If there are any errors,
559     /// this method will stop adding `fd`s and return the first error, leaving this context in a
560     /// undefined state.
add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()>561     pub fn add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()> {
562         for (fd, token) in fd_tokens {
563             self.add(*fd, T::from_raw_token(token.as_raw_token()))?;
564         }
565         Ok(())
566     }
567 
568     /// Adds the given `fd` to this context and associates the given `token` with the `fd`'s
569     /// readable events.
570     ///
571     /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
572     /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
573     /// FD number) added to this context, events will not be reported by `wait` anymore.
add(&self, fd: &dyn AsRawFd, token: T) -> Result<()>574     pub fn add(&self, fd: &dyn AsRawFd, token: T) -> Result<()> {
575         self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token)
576     }
577 
578     /// Adds the given `fd` to this context, watching for the specified events and associates the
579     /// given 'token' with those events.
580     ///
581     /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
582     /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
583     /// FD number) added to this context, events will not be reported by `wait` anymore.
add_fd_with_events( &self, fd: &dyn AsRawFd, events: WatchingEvents, token: T, ) -> Result<()>584     pub fn add_fd_with_events(
585         &self,
586         fd: &dyn AsRawFd,
587         events: WatchingEvents,
588         token: T,
589     ) -> Result<()> {
590         self.epoll_ctx.add_fd_with_events(fd, events, token)?;
591         self.hangups.set(0);
592         self.max_hangups.set(self.max_hangups.get() + 1);
593         Ok(())
594     }
595 
596     /// If `fd` was previously added to this context, the watched events will be replaced with
597     /// `events` and the token associated with it will be replaced with the given `token`.
modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()>598     pub fn modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()> {
599         self.epoll_ctx.modify(fd, events, token)
600     }
601 
602     /// Deletes the given `fd` from this context.
603     ///
604     /// If an `fd`'s token shows up in the list of hangup events, it should be removed using this
605     /// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`.
606     /// Failure to do so will cause the `wait` method to always return immediately, causing ~100%
607     /// CPU load.
delete(&self, fd: &dyn AsRawFd) -> Result<()>608     pub fn delete(&self, fd: &dyn AsRawFd) -> Result<()> {
609         self.epoll_ctx.delete(fd)?;
610         self.hangups.set(0);
611         self.max_hangups.set(self.max_hangups.get() - 1);
612         Ok(())
613     }
614 
615     // This method determines if the the user of wait is misusing the `PollContext` by leaving FDs
616     // in this `PollContext` that have been shutdown or hungup on. Such an FD will cause `wait` to
617     // return instantly with a hungup event. If that FD is perpetually left in this context, a busy
618     // loop burning ~100% of one CPU will silently occur with no human visible malfunction.
619     //
620     // How do we know if the client of this context is ignoring hangups? A naive implementation
621     // would trigger if consecutive wait calls yield hangup events, but there are legitimate cases
622     // for this, such as two distinct sockets becoming hungup across two consecutive wait calls. A
623     // smarter implementation would only trigger if `delete` wasn't called between waits that
624     // yielded hangups. Sadly `delete` isn't the only way to remove an FD from this context. The
625     // other way is for the client to close the hungup FD, which automatically removes it from this
626     // context. Assuming that the client always uses close, this implementation would too eagerly
627     // trigger.
628     //
629     // The implementation used here keeps an upper bound of FDs in this context using a counter
630     // hooked into add/delete (which is imprecise because close can also remove FDs without us
631     // knowing). The number of consecutive (no add or delete in between) hangups yielded by wait
632     // calls is counted and compared to the upper bound. If the upper bound is exceeded by the
633     // consecutive hangups, the implementation triggers the check and logs.
634     //
635     // This implementation has false negatives because the upper bound can be completely too high,
636     // in the worst case caused by only using close instead of delete. However, this method has the
637     // advantage of always triggering eventually genuine busy loop cases, requires no dynamic
638     // allocations, is fast and constant time to compute, and has no false positives.
check_for_hungup_busy_loop(&self, new_hangups: usize)639     fn check_for_hungup_busy_loop(&self, new_hangups: usize) {
640         let old_hangups = self.hangups.get();
641         let max_hangups = self.max_hangups.get();
642         if old_hangups <= max_hangups && old_hangups + new_hangups > max_hangups {
643             warn!(
644                 "busy poll wait loop with hungup FDs detected on thread {}",
645                 thread::current().name().unwrap_or("")
646             );
647             // This panic is helpful for tests of this functionality.
648             #[cfg(test)]
649             panic!("hungup busy loop detected");
650         }
651         self.hangups.set(old_hangups + new_hangups);
652     }
653 
654     /// Waits for any events to occur in FDs that were previously added to this context.
655     ///
656     /// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading
657     /// for readable events and not closing for hungup events), subsequent calls to `wait` will
658     /// return immediately. The consequence of not handling an event perpetually while calling
659     /// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to
660     /// ~100% usage.
661     ///
662     /// # Panics
663     /// Panics if the returned `PollEvents` structure is not dropped before subsequent `wait` calls.
wait(&self) -> Result<PollEvents<T>>664     pub fn wait(&self) -> Result<PollEvents<T>> {
665         self.wait_timeout(Duration::new(i64::MAX as u64, 0))
666     }
667 
668     /// Like `wait` except will only block for a maximum of the given `timeout`.
669     ///
670     /// This may return earlier than `timeout` with zero events if the duration indicated exceeds
671     /// system limits.
wait_timeout(&self, timeout: Duration) -> Result<PollEvents<T>>672     pub fn wait_timeout(&self, timeout: Duration) -> Result<PollEvents<T>> {
673         let events = self.epoll_ctx.wait_timeout(&self.events, timeout)?;
674         let hangups = events.iter_hungup().count();
675         self.check_for_hungup_busy_loop(hangups);
676         Ok(events)
677     }
678 }
679 
680 impl<T: PollToken> AsRawFd for PollContext<T> {
as_raw_fd(&self) -> RawFd681     fn as_raw_fd(&self) -> RawFd {
682         self.epoll_ctx.as_raw_fd()
683     }
684 }
685 
686 impl<T: PollToken> IntoRawFd for PollContext<T> {
into_raw_fd(self) -> RawFd687     fn into_raw_fd(self) -> RawFd {
688         self.epoll_ctx.into_raw_fd()
689     }
690 }
691 
692 #[cfg(test)]
693 mod tests {
694     use super::*;
695     use crate::EventFd;
696     use poll_token_derive::PollToken;
697     use std::os::unix::net::UnixStream;
698     use std::time::Instant;
699 
700     #[test]
poll_context()701     fn poll_context() {
702         let evt1 = EventFd::new().unwrap();
703         let evt2 = EventFd::new().unwrap();
704         evt1.write(1).unwrap();
705         evt2.write(1).unwrap();
706         let ctx: PollContext<u32> = PollContext::build_with(&[(&evt1, 1), (&evt2, 2)]).unwrap();
707 
708         let mut evt_count = 0;
709         while evt_count < 2 {
710             for event in ctx.wait().unwrap().iter_readable() {
711                 evt_count += 1;
712                 match event.token() {
713                     1 => {
714                         evt1.read().unwrap();
715                         ctx.delete(&evt1).unwrap();
716                     }
717                     2 => {
718                         evt2.read().unwrap();
719                         ctx.delete(&evt2).unwrap();
720                     }
721                     _ => panic!("unexpected token"),
722                 };
723             }
724         }
725         assert_eq!(evt_count, 2);
726     }
727 
728     #[test]
poll_context_overflow()729     fn poll_context_overflow() {
730         const EVT_COUNT: usize = POLL_CONTEXT_MAX_EVENTS * 2 + 1;
731         let ctx: PollContext<usize> = PollContext::new().unwrap();
732         let mut evts = Vec::with_capacity(EVT_COUNT);
733         for i in 0..EVT_COUNT {
734             let evt = EventFd::new().unwrap();
735             evt.write(1).unwrap();
736             ctx.add(&evt, i).unwrap();
737             evts.push(evt);
738         }
739         let mut evt_count = 0;
740         while evt_count < EVT_COUNT {
741             for event in ctx.wait().unwrap().iter_readable() {
742                 evts[event.token()].read().unwrap();
743                 evt_count += 1;
744             }
745         }
746     }
747 
748     #[test]
749     #[should_panic]
poll_context_hungup()750     fn poll_context_hungup() {
751         let (s1, s2) = UnixStream::pair().unwrap();
752         let ctx: PollContext<u32> = PollContext::new().unwrap();
753         ctx.add(&s1, 1).unwrap();
754 
755         // Causes s1 to receive hangup events, which we purposefully ignore to trip the detection
756         // logic in `PollContext`.
757         drop(s2);
758 
759         // Should easily panic within this many iterations.
760         for _ in 0..1000 {
761             ctx.wait().unwrap();
762         }
763     }
764 
765     #[test]
poll_context_timeout()766     fn poll_context_timeout() {
767         let ctx: PollContext<u32> = PollContext::new().unwrap();
768         let dur = Duration::from_millis(10);
769         let start_inst = Instant::now();
770         ctx.wait_timeout(dur).unwrap();
771         assert!(start_inst.elapsed() >= dur);
772     }
773 
774     #[test]
775     #[allow(dead_code)]
poll_token_derive()776     fn poll_token_derive() {
777         #[derive(PollToken)]
778         enum EmptyToken {}
779 
780         #[derive(PartialEq, Debug, PollToken)]
781         enum Token {
782             Alpha,
783             Beta,
784             // comments
785             Gamma(u32),
786             Delta { index: usize },
787             Omega,
788         }
789 
790         assert_eq!(
791             Token::from_raw_token(Token::Alpha.as_raw_token()),
792             Token::Alpha
793         );
794         assert_eq!(
795             Token::from_raw_token(Token::Beta.as_raw_token()),
796             Token::Beta
797         );
798         assert_eq!(
799             Token::from_raw_token(Token::Gamma(55).as_raw_token()),
800             Token::Gamma(55)
801         );
802         assert_eq!(
803             Token::from_raw_token(Token::Delta { index: 100 }.as_raw_token()),
804             Token::Delta { index: 100 }
805         );
806         assert_eq!(
807             Token::from_raw_token(Token::Omega.as_raw_token()),
808             Token::Omega
809         );
810     }
811 }
812