1 /*
2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.nio.channels.spi.AsynchronousChannelProvider;
29 import java.io.IOException;
30 import java.util.concurrent.ArrayBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import static sun.nio.ch.EPoll.*;
34 
35 /**
36  * AsynchronousChannelGroup implementation based on the Linux epoll facility.
37  */
38 
39 final class EPollPort
40     extends Port
41 {
42     // maximum number of events to poll at a time
43     private static final int MAX_EPOLL_EVENTS = 512;
44 
45     // errors
46     private static final int ENOENT     = 2;
47 
48     // epoll file descriptor
49     private final int epfd;
50 
51     // true if epoll closed
52     private boolean closed;
53 
54     // socket pair used for wakeup
55     private final int sp[];
56 
57     // number of wakeups pending
58     private final AtomicInteger wakeupCount = new AtomicInteger();
59 
60     // address of the poll array passed to epoll_wait
61     private final long address;
62 
63     // encapsulates an event for a channel
64     static class Event {
65         final PollableChannel channel;
66         final int events;
67 
Event(PollableChannel channel, int events)68         Event(PollableChannel channel, int events) {
69             this.channel = channel;
70             this.events = events;
71         }
72 
channel()73         PollableChannel channel()   { return channel; }
events()74         int events()                { return events; }
75     }
76 
77     // queue of events for cases that a polling thread dequeues more than one
78     // event
79     private final ArrayBlockingQueue<Event> queue;
80     private final Event NEED_TO_POLL = new Event(null, 0);
81     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
82 
EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)83     EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
84         throws IOException
85     {
86         super(provider, pool);
87 
88         // open epoll
89         this.epfd = epollCreate();
90 
91         // create socket pair for wakeup mechanism
92         int[] sv = new int[2];
93         try {
94             socketpair(sv);
95             // register one end with epoll
96             epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN);
97         } catch (IOException x) {
98             close0(epfd);
99             throw x;
100         }
101         this.sp = sv;
102 
103         // allocate the poll array
104         this.address = allocatePollArray(MAX_EPOLL_EVENTS);
105 
106         // create the queue and offer the special event to ensure that the first
107         // threads polls
108         this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
109         this.queue.offer(NEED_TO_POLL);
110     }
111 
start()112     EPollPort start() {
113         startThreads(new EventHandlerTask());
114         return this;
115     }
116 
117     /**
118      * Release all resources
119      */
implClose()120     private void implClose() {
121         synchronized (this) {
122             if (closed)
123                 return;
124             closed = true;
125         }
126         freePollArray(address);
127         close0(sp[0]);
128         close0(sp[1]);
129         close0(epfd);
130     }
131 
wakeup()132     private void wakeup() {
133         if (wakeupCount.incrementAndGet() == 1) {
134             // write byte to socketpair to force wakeup
135             try {
136                 interrupt(sp[1]);
137             } catch (IOException x) {
138                 throw new AssertionError(x);
139             }
140         }
141     }
142 
143     @Override
executeOnHandlerTask(Runnable task)144     void executeOnHandlerTask(Runnable task) {
145         synchronized (this) {
146             if (closed)
147                 throw new RejectedExecutionException();
148             offerTask(task);
149             wakeup();
150         }
151     }
152 
153     @Override
shutdownHandlerTasks()154     void shutdownHandlerTasks() {
155         /*
156          * If no tasks are running then just release resources; otherwise
157          * write to the one end of the socketpair to wakeup any polling threads.
158          */
159         int nThreads = threadCount();
160         if (nThreads == 0) {
161             implClose();
162         } else {
163             // send interrupt to each thread
164             while (nThreads-- > 0) {
165                 wakeup();
166             }
167         }
168     }
169 
170     // invoke by clients to register a file descriptor
171     @Override
startPoll(int fd, int events)172     void startPoll(int fd, int events) {
173         // update events (or add to epoll on first usage)
174         int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
175         if (err == ENOENT)
176             err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
177         if (err != 0)
178             throw new AssertionError();     // should not happen
179     }
180 
181     /*
182      * Task to process events from epoll and dispatch to the channel's
183      * onEvent handler.
184      *
185      * Events are retreived from epoll in batch and offered to a BlockingQueue
186      * where they are consumed by handler threads. A special "NEED_TO_POLL"
187      * event is used to signal one consumer to re-poll when all events have
188      * been consumed.
189      */
190     private class EventHandlerTask implements Runnable {
poll()191         private Event poll() throws IOException {
192             try {
193                 for (;;) {
194                     int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
195                     /*
196                      * 'n' events have been read. Here we map them to their
197                      * corresponding channel in batch and queue n-1 so that
198                      * they can be handled by other handler threads. The last
199                      * event is handled by this thread (and so is not queued).
200                      */
201                     fdToChannelLock.readLock().lock();
202                     try {
203                         while (n-- > 0) {
204                             long eventAddress = getEvent(address, n);
205                             int fd = getDescriptor(eventAddress);
206 
207                             // wakeup
208                             if (fd == sp[0]) {
209                                 if (wakeupCount.decrementAndGet() == 0) {
210                                     // no more wakeups so drain pipe
211                                     drain1(sp[0]);
212                                 }
213 
214                                 // queue special event if there are more events
215                                 // to handle.
216                                 if (n > 0) {
217                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
218                                     continue;
219                                 }
220                                 return EXECUTE_TASK_OR_SHUTDOWN;
221                             }
222 
223                             PollableChannel channel = fdToChannel.get(fd);
224                             if (channel != null) {
225                                 int events = getEvents(eventAddress);
226                                 Event ev = new Event(channel, events);
227 
228                                 // n-1 events are queued; This thread handles
229                                 // the last one except for the wakeup
230                                 if (n > 0) {
231                                     queue.offer(ev);
232                                 } else {
233                                     return ev;
234                                 }
235                             }
236                         }
237                     } finally {
238                         fdToChannelLock.readLock().unlock();
239                     }
240                 }
241             } finally {
242                 // to ensure that some thread will poll when all events have
243                 // been consumed
244                 queue.offer(NEED_TO_POLL);
245             }
246         }
247 
run()248         public void run() {
249             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
250                 Invoker.getGroupAndInvokeCount();
251             final boolean isPooledThread = (myGroupAndInvokeCount != null);
252             boolean replaceMe = false;
253             Event ev;
254             try {
255                 for (;;) {
256                     // reset invoke count
257                     if (isPooledThread)
258                         myGroupAndInvokeCount.resetInvokeCount();
259 
260                     try {
261                         replaceMe = false;
262                         ev = queue.take();
263 
264                         // no events and this thread has been "selected" to
265                         // poll for more.
266                         if (ev == NEED_TO_POLL) {
267                             try {
268                                 ev = poll();
269                             } catch (IOException x) {
270                                 x.printStackTrace();
271                                 return;
272                             }
273                         }
274                     } catch (InterruptedException x) {
275                         continue;
276                     }
277 
278                     // handle wakeup to execute task or shutdown
279                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
280                         Runnable task = pollTask();
281                         if (task == null) {
282                             // shutdown request
283                             return;
284                         }
285                         // run task (may throw error/exception)
286                         replaceMe = true;
287                         task.run();
288                         continue;
289                     }
290 
291                     // process event
292                     try {
293                         ev.channel().onEvent(ev.events(), isPooledThread);
294                     } catch (Error x) {
295                         replaceMe = true; throw x;
296                     } catch (RuntimeException x) {
297                         replaceMe = true; throw x;
298                     }
299                 }
300             } finally {
301                 // last handler to exit when shutdown releases resources
302                 int remaining = threadExit(this, replaceMe);
303                 if (remaining == 0 && isShutdown()) {
304                     implClose();
305                 }
306             }
307         }
308     }
309 
310     // -- Native methods --
311 
socketpair(int[] sv)312     private static native void socketpair(int[] sv) throws IOException;
313 
interrupt(int fd)314     private static native void interrupt(int fd) throws IOException;
315 
drain1(int fd)316     private static native void drain1(int fd) throws IOException;
317 
close0(int fd)318     private static native void close0(int fd);
319 }
320