1 /*
2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.nio.ByteBuffer;
29 import java.nio.channels.*;
30 import java.net.SocketOption;
31 import java.net.StandardSocketOptions;
32 import java.net.SocketAddress;
33 import java.net.InetSocketAddress;
34 import java.io.IOException;
35 import java.io.FileDescriptor;
36 import java.util.Set;
37 import java.util.HashSet;
38 import java.util.Collections;
39 import java.util.concurrent.*;
40 import java.util.concurrent.locks.*;
41 import sun.net.NetHooks;
42 import sun.net.ExtendedOptionsImpl;
43 
44 /**
45  * Base implementation of AsynchronousSocketChannel
46  */
47 
48 abstract class AsynchronousSocketChannelImpl
49     extends AsynchronousSocketChannel
50     implements Cancellable, Groupable
51 {
52     protected final FileDescriptor fd;
53 
54     // protects state, localAddress, and remoteAddress
55     protected final Object stateLock = new Object();
56 
57     protected volatile InetSocketAddress localAddress = null;
58     protected volatile InetSocketAddress remoteAddress = null;
59 
60     // State, increases monotonically
61     static final int ST_UNINITIALIZED = -1;
62     static final int ST_UNCONNECTED = 0;
63     static final int ST_PENDING = 1;
64     static final int ST_CONNECTED = 2;
65     protected volatile int state = ST_UNINITIALIZED;
66 
67     // reading state
68     private final Object readLock = new Object();
69     private boolean reading;
70     private boolean readShutdown;
71     private boolean readKilled;     // further reading disallowed due to timeout
72 
73     // writing state
74     private final Object writeLock = new Object();
75     private boolean writing;
76     private boolean writeShutdown;
77     private boolean writeKilled;    // further writing disallowed due to timeout
78 
79     // close support
80     private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
81     private volatile boolean open = true;
82 
83     // set true when exclusive binding is on and SO_REUSEADDR is emulated
84     private boolean isReuseAddress;
85 
AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)86     AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)
87         throws IOException
88     {
89         super(group.provider());
90         this.fd = Net.socket(true);
91         this.state = ST_UNCONNECTED;
92     }
93 
94     // Constructor for sockets obtained from AsynchronousServerSocketChannelImpl
AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group, FileDescriptor fd, InetSocketAddress remote)95     AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group,
96                                   FileDescriptor fd,
97                                   InetSocketAddress remote)
98         throws IOException
99     {
100         super(group.provider());
101         this.fd = fd;
102         this.state = ST_CONNECTED;
103         this.localAddress = Net.localAddress(fd);
104         this.remoteAddress = remote;
105     }
106 
107     @Override
isOpen()108     public final boolean isOpen() {
109         return open;
110     }
111 
112     /**
113      * Marks beginning of access to file descriptor/handle
114      */
begin()115     final void begin() throws IOException {
116         closeLock.readLock().lock();
117         if (!isOpen())
118             throw new ClosedChannelException();
119     }
120 
121     /**
122      * Marks end of access to file descriptor/handle
123      */
end()124     final void end() {
125         closeLock.readLock().unlock();
126     }
127 
128     /**
129      * Invoked to close socket and release other resources.
130      */
implClose()131     abstract void implClose() throws IOException;
132 
133     @Override
close()134     public final void close() throws IOException {
135         // synchronize with any threads initiating asynchronous operations
136         closeLock.writeLock().lock();
137         try {
138             if (!open)
139                 return;     // already closed
140             open = false;
141         } finally {
142             closeLock.writeLock().unlock();
143         }
144         implClose();
145     }
146 
enableReading(boolean killed)147     final void enableReading(boolean killed) {
148         synchronized (readLock) {
149             reading = false;
150             if (killed)
151                 readKilled = true;
152         }
153     }
154 
enableReading()155     final void enableReading() {
156         enableReading(false);
157     }
158 
enableWriting(boolean killed)159     final void enableWriting(boolean killed) {
160         synchronized (writeLock) {
161             writing = false;
162             if (killed)
163                 writeKilled = true;
164         }
165     }
166 
enableWriting()167     final void enableWriting() {
168         enableWriting(false);
169     }
170 
killReading()171     final void killReading() {
172         synchronized (readLock) {
173             readKilled = true;
174         }
175     }
176 
killWriting()177     final void killWriting() {
178         synchronized (writeLock) {
179             writeKilled = true;
180         }
181     }
182 
killConnect()183     final void killConnect() {
184         // when a connect is cancelled then the connection may have been
185         // established so prevent reading or writing.
186         killReading();
187         killWriting();
188     }
189 
190     /**
191      * Invoked by connect to initiate the connect operation.
192      */
implConnect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)193     abstract <A> Future<Void> implConnect(SocketAddress remote,
194                                           A attachment,
195                                           CompletionHandler<Void,? super A> handler);
196 
197     @Override
connect(SocketAddress remote)198     public final Future<Void> connect(SocketAddress remote) {
199         return implConnect(remote, null, null);
200     }
201 
202     @Override
connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)203     public final <A> void connect(SocketAddress remote,
204                                   A attachment,
205                                   CompletionHandler<Void,? super A> handler)
206     {
207         if (handler == null)
208             throw new NullPointerException("'handler' is null");
209         implConnect(remote, attachment, handler);
210     }
211 
212     /**
213      * Invoked by read to initiate the I/O operation.
214      */
implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)215     abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
216                                                      ByteBuffer dst,
217                                                      ByteBuffer[] dsts,
218                                                      long timeout,
219                                                      TimeUnit unit,
220                                                      A attachment,
221                                                      CompletionHandler<V,? super A> handler);
222 
223     @SuppressWarnings("unchecked")
read(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A att, CompletionHandler<V,? super A> handler)224     private <V extends Number,A> Future<V> read(boolean isScatteringRead,
225                                                 ByteBuffer dst,
226                                                 ByteBuffer[] dsts,
227                                                 long timeout,
228                                                 TimeUnit unit,
229                                                 A att,
230                                                 CompletionHandler<V,? super A> handler)
231     {
232         if (!isOpen()) {
233             Throwable e = new ClosedChannelException();
234             if (handler == null)
235                 return CompletedFuture.withFailure(e);
236             Invoker.invoke(this, handler, att, null, e);
237             return null;
238         }
239 
240         if (remoteAddress == null)
241             throw new NotYetConnectedException();
242 
243         boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
244         boolean shutdown = false;
245 
246         // check and update state
247         synchronized (readLock) {
248             if (readKilled)
249                 throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
250             if (reading)
251                 throw new ReadPendingException();
252             if (readShutdown) {
253                 shutdown = true;
254             } else {
255                 if (hasSpaceToRead) {
256                     reading = true;
257                 }
258             }
259         }
260 
261         // immediately complete with -1 if shutdown for read
262         // immediately complete with 0 if no space remaining
263         if (shutdown || !hasSpaceToRead) {
264             Number result;
265             if (isScatteringRead) {
266                 result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
267             } else {
268                 result = (shutdown) ? -1 : 0;
269             }
270             if (handler == null)
271                 return CompletedFuture.withResult((V)result);
272             Invoker.invoke(this, handler, att, (V)result, null);
273             return null;
274         }
275 
276         return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
277     }
278 
279     @Override
read(ByteBuffer dst)280     public final Future<Integer> read(ByteBuffer dst) {
281         if (dst.isReadOnly())
282             throw new IllegalArgumentException("Read-only buffer");
283         return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
284     }
285 
286     @Override
read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler)287     public final <A> void read(ByteBuffer dst,
288                                long timeout,
289                                TimeUnit unit,
290                                A attachment,
291                                CompletionHandler<Integer,? super A> handler)
292     {
293         if (handler == null)
294             throw new NullPointerException("'handler' is null");
295         if (dst.isReadOnly())
296             throw new IllegalArgumentException("Read-only buffer");
297         read(false, dst, null, timeout, unit, attachment, handler);
298     }
299 
300     @Override
read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler)301     public final <A> void read(ByteBuffer[] dsts,
302                                int offset,
303                                int length,
304                                long timeout,
305                                TimeUnit unit,
306                                A attachment,
307                                CompletionHandler<Long,? super A> handler)
308     {
309         if (handler == null)
310             throw new NullPointerException("'handler' is null");
311         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
312             throw new IndexOutOfBoundsException();
313         ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
314         for (int i=0; i<bufs.length; i++) {
315             if (bufs[i].isReadOnly())
316                 throw new IllegalArgumentException("Read-only buffer");
317         }
318         read(true, null, bufs, timeout, unit, attachment, handler);
319     }
320 
321     /**
322      * Invoked by write to initiate the I/O operation.
323      */
implWrite(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)324     abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
325                                                       ByteBuffer src,
326                                                       ByteBuffer[] srcs,
327                                                       long timeout,
328                                                       TimeUnit unit,
329                                                       A attachment,
330                                                       CompletionHandler<V,? super A> handler);
331 
332     @SuppressWarnings("unchecked")
write(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A att, CompletionHandler<V,? super A> handler)333     private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
334                                                  ByteBuffer src,
335                                                  ByteBuffer[] srcs,
336                                                  long timeout,
337                                                  TimeUnit unit,
338                                                  A att,
339                                                  CompletionHandler<V,? super A> handler)
340     {
341         boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
342 
343         boolean closed = false;
344         if (isOpen()) {
345             if (remoteAddress == null)
346                 throw new NotYetConnectedException();
347             // check and update state
348             synchronized (writeLock) {
349                 if (writeKilled)
350                     throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
351                 if (writing)
352                     throw new WritePendingException();
353                 if (writeShutdown) {
354                     closed = true;
355                 } else {
356                     if (hasDataToWrite)
357                         writing = true;
358                 }
359             }
360         } else {
361             closed = true;
362         }
363 
364         // channel is closed or shutdown for write
365         if (closed) {
366             Throwable e = new ClosedChannelException();
367             if (handler == null)
368                 return CompletedFuture.withFailure(e);
369             Invoker.invoke(this, handler, att, null, e);
370             return null;
371         }
372 
373         // nothing to write so complete immediately
374         if (!hasDataToWrite) {
375             Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
376             if (handler == null)
377                 return CompletedFuture.withResult((V)result);
378             Invoker.invoke(this, handler, att, (V)result, null);
379             return null;
380         }
381 
382         return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
383     }
384 
385     @Override
write(ByteBuffer src)386     public final Future<Integer> write(ByteBuffer src) {
387         return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
388     }
389 
390     @Override
write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler)391     public final <A> void write(ByteBuffer src,
392                                 long timeout,
393                                 TimeUnit unit,
394                                 A attachment,
395                                 CompletionHandler<Integer,? super A> handler)
396     {
397         if (handler == null)
398             throw new NullPointerException("'handler' is null");
399         write(false, src, null, timeout, unit, attachment, handler);
400     }
401 
402     @Override
write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler)403     public final <A> void  write(ByteBuffer[] srcs,
404                                  int offset,
405                                  int length,
406                                  long timeout,
407                                  TimeUnit unit,
408                                  A attachment,
409                                  CompletionHandler<Long,? super A> handler)
410     {
411         if (handler == null)
412             throw new NullPointerException("'handler' is null");
413         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
414             throw new IndexOutOfBoundsException();
415         srcs = Util.subsequence(srcs, offset, length);
416         write(true, null, srcs, timeout, unit, attachment, handler);
417     }
418 
419     @Override
bind(SocketAddress local)420     public final AsynchronousSocketChannel bind(SocketAddress local)
421         throws IOException
422     {
423         try {
424             begin();
425             synchronized (stateLock) {
426                 if (state == ST_PENDING)
427                     throw new ConnectionPendingException();
428                 if (localAddress != null)
429                     throw new AlreadyBoundException();
430                 InetSocketAddress isa = (local == null) ?
431                     new InetSocketAddress(0) : Net.checkAddress(local);
432                 SecurityManager sm = System.getSecurityManager();
433                 if (sm != null) {
434                     sm.checkListen(isa.getPort());
435                 }
436                 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
437                 Net.bind(fd, isa.getAddress(), isa.getPort());
438                 localAddress = Net.localAddress(fd);
439             }
440         } finally {
441             end();
442         }
443         return this;
444     }
445 
446     @Override
getLocalAddress()447     public final SocketAddress getLocalAddress() throws IOException {
448         if (!isOpen())
449             throw new ClosedChannelException();
450          return Net.getRevealedLocalAddress(localAddress);
451     }
452 
453     @Override
setOption(SocketOption<T> name, T value)454     public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value)
455         throws IOException
456     {
457         if (name == null)
458             throw new NullPointerException();
459         if (!supportedOptions().contains(name))
460             throw new UnsupportedOperationException("'" + name + "' not supported");
461 
462         try {
463             begin();
464             if (writeShutdown)
465                 throw new IOException("Connection has been shutdown for writing");
466             if (name == StandardSocketOptions.SO_REUSEADDR &&
467                     Net.useExclusiveBind())
468             {
469                 // SO_REUSEADDR emulated when using exclusive bind
470                 isReuseAddress = (Boolean)value;
471             } else {
472                 Net.setSocketOption(fd, Net.UNSPEC, name, value);
473             }
474             return this;
475         } finally {
476             end();
477         }
478     }
479 
480     @Override
481     @SuppressWarnings("unchecked")
getOption(SocketOption<T> name)482     public final <T> T getOption(SocketOption<T> name) throws IOException {
483         if (name == null)
484             throw new NullPointerException();
485         if (!supportedOptions().contains(name))
486             throw new UnsupportedOperationException("'" + name + "' not supported");
487 
488         try {
489             begin();
490             if (name == StandardSocketOptions.SO_REUSEADDR &&
491                     Net.useExclusiveBind())
492             {
493                 // SO_REUSEADDR emulated when using exclusive bind
494                 return (T)Boolean.valueOf(isReuseAddress);
495             }
496             return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
497         } finally {
498             end();
499         }
500     }
501 
502     private static class DefaultOptionsHolder {
503         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
504 
defaultOptions()505         private static Set<SocketOption<?>> defaultOptions() {
506             HashSet<SocketOption<?>> set = new HashSet<SocketOption<?>>(5);
507             set.add(StandardSocketOptions.SO_SNDBUF);
508             set.add(StandardSocketOptions.SO_RCVBUF);
509             set.add(StandardSocketOptions.SO_KEEPALIVE);
510             set.add(StandardSocketOptions.SO_REUSEADDR);
511             set.add(StandardSocketOptions.TCP_NODELAY);
512             if (ExtendedOptionsImpl.flowSupported()) {
513                 set.add(jdk.net.ExtendedSocketOptions.SO_FLOW_SLA);
514             }
515             return Collections.unmodifiableSet(set);
516         }
517     }
518 
519     @Override
supportedOptions()520     public final Set<SocketOption<?>> supportedOptions() {
521         return DefaultOptionsHolder.defaultOptions;
522     }
523 
524     @Override
getRemoteAddress()525     public final SocketAddress getRemoteAddress() throws IOException {
526         if (!isOpen())
527             throw new ClosedChannelException();
528         return remoteAddress;
529     }
530 
531     @Override
shutdownInput()532     public final AsynchronousSocketChannel shutdownInput() throws IOException {
533         try {
534             begin();
535             if (remoteAddress == null)
536                 throw new NotYetConnectedException();
537             synchronized (readLock) {
538                 if (!readShutdown) {
539                     Net.shutdown(fd, Net.SHUT_RD);
540                     readShutdown = true;
541                 }
542             }
543         } finally {
544             end();
545         }
546         return this;
547     }
548 
549     @Override
shutdownOutput()550     public final AsynchronousSocketChannel shutdownOutput() throws IOException {
551         try {
552             begin();
553             if (remoteAddress == null)
554                 throw new NotYetConnectedException();
555             synchronized (writeLock) {
556                 if (!writeShutdown) {
557                     Net.shutdown(fd, Net.SHUT_WR);
558                     writeShutdown = true;
559                 }
560             }
561         } finally {
562             end();
563         }
564         return this;
565     }
566 
567     @Override
toString()568     public final String toString() {
569         StringBuilder sb = new StringBuilder();
570         sb.append(this.getClass().getName());
571         sb.append('[');
572         synchronized (stateLock) {
573             if (!isOpen()) {
574                 sb.append("closed");
575             } else {
576                 switch (state) {
577                 case ST_UNCONNECTED:
578                     sb.append("unconnected");
579                     break;
580                 case ST_PENDING:
581                     sb.append("connection-pending");
582                     break;
583                 case ST_CONNECTED:
584                     sb.append("connected");
585                     if (readShutdown)
586                         sb.append(" ishut");
587                     if (writeShutdown)
588                         sb.append(" oshut");
589                     break;
590                 }
591                 if (localAddress != null) {
592                     sb.append(" local=");
593                     sb.append(
594                             Net.getRevealedLocalAddressAsString(localAddress));
595                 }
596                 if (remoteAddress != null) {
597                     sb.append(" remote=");
598                     sb.append(remoteAddress.toString());
599                 }
600             }
601         }
602         sb.append(']');
603         return sb.toString();
604     }
605 }
606