1/*
2 * Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.ByteBuffer;
29import java.nio.channels.*;
30import java.net.SocketOption;
31import java.net.StandardSocketOptions;
32import java.net.SocketAddress;
33import java.net.InetSocketAddress;
34import java.io.IOException;
35import java.io.FileDescriptor;
36import java.util.Set;
37import java.util.HashSet;
38import java.util.Collections;
39import java.util.concurrent.*;
40import java.util.concurrent.locks.*;
41import sun.net.NetHooks;
42import sun.net.ext.ExtendedSocketOptions;
43
44/**
45 * Base implementation of AsynchronousSocketChannel
46 */
47
48abstract class AsynchronousSocketChannelImpl
49    extends AsynchronousSocketChannel
50    implements Cancellable, Groupable
51{
52    protected final FileDescriptor fd;
53
54    // protects state, localAddress, and remoteAddress
55    protected final Object stateLock = new Object();
56
57    protected volatile InetSocketAddress localAddress;
58    protected volatile InetSocketAddress remoteAddress;
59
60    // State, increases monotonically
61    static final int ST_UNINITIALIZED = -1;
62    static final int ST_UNCONNECTED = 0;
63    static final int ST_PENDING = 1;
64    static final int ST_CONNECTED = 2;
65    protected volatile int state = ST_UNINITIALIZED;
66
67    // reading state
68    private final Object readLock = new Object();
69    private boolean reading;
70    private boolean readShutdown;
71    private boolean readKilled;     // further reading disallowed due to timeout
72
73    // writing state
74    private final Object writeLock = new Object();
75    private boolean writing;
76    private boolean writeShutdown;
77    private boolean writeKilled;    // further writing disallowed due to timeout
78
79    // close support
80    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
81    private volatile boolean closed;
82
83    // set true when exclusive binding is on and SO_REUSEADDR is emulated
84    private boolean isReuseAddress;
85
86    AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)
87        throws IOException
88    {
89        super(group.provider());
90        this.fd = Net.socket(true);
91        this.state = ST_UNCONNECTED;
92    }
93
94    // Constructor for sockets obtained from AsynchronousServerSocketChannelImpl
95    AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group,
96                                  FileDescriptor fd,
97                                  InetSocketAddress remote)
98        throws IOException
99    {
100        super(group.provider());
101        this.fd = fd;
102        this.state = ST_CONNECTED;
103        this.localAddress = Net.localAddress(fd);
104        this.remoteAddress = remote;
105    }
106
107    @Override
108    public final boolean isOpen() {
109        return !closed;
110    }
111
112    /**
113     * Marks beginning of access to file descriptor/handle
114     */
115    final void begin() throws IOException {
116        closeLock.readLock().lock();
117        if (!isOpen())
118            throw new ClosedChannelException();
119    }
120
121    /**
122     * Marks end of access to file descriptor/handle
123     */
124    final void end() {
125        closeLock.readLock().unlock();
126    }
127
128    /**
129     * Invoked to close socket and release other resources.
130     */
131    abstract void implClose() throws IOException;
132
133    @Override
134    public final void close() throws IOException {
135        // synchronize with any threads initiating asynchronous operations
136        closeLock.writeLock().lock();
137        try {
138            if (closed)
139                return;     // already closed
140            closed = true;
141        } finally {
142            closeLock.writeLock().unlock();
143        }
144        implClose();
145    }
146
147    final void enableReading(boolean killed) {
148        synchronized (readLock) {
149            reading = false;
150            if (killed)
151                readKilled = true;
152        }
153    }
154
155    final void enableReading() {
156        enableReading(false);
157    }
158
159    final void enableWriting(boolean killed) {
160        synchronized (writeLock) {
161            writing = false;
162            if (killed)
163                writeKilled = true;
164        }
165    }
166
167    final void enableWriting() {
168        enableWriting(false);
169    }
170
171    final void killReading() {
172        synchronized (readLock) {
173            readKilled = true;
174        }
175    }
176
177    final void killWriting() {
178        synchronized (writeLock) {
179            writeKilled = true;
180        }
181    }
182
183    final void killConnect() {
184        // when a connect is cancelled then the connection may have been
185        // established so prevent reading or writing.
186        killReading();
187        killWriting();
188    }
189
190    /**
191     * Invoked by connect to initiate the connect operation.
192     */
193    abstract <A> Future<Void> implConnect(SocketAddress remote,
194                                          A attachment,
195                                          CompletionHandler<Void,? super A> handler);
196
197    @Override
198    public final Future<Void> connect(SocketAddress remote) {
199        return implConnect(remote, null, null);
200    }
201
202    @Override
203    public final <A> void connect(SocketAddress remote,
204                                  A attachment,
205                                  CompletionHandler<Void,? super A> handler)
206    {
207        if (handler == null)
208            throw new NullPointerException("'handler' is null");
209        implConnect(remote, attachment, handler);
210    }
211
212    /**
213     * Invoked by read to initiate the I/O operation.
214     */
215    abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
216                                                     ByteBuffer dst,
217                                                     ByteBuffer[] dsts,
218                                                     long timeout,
219                                                     TimeUnit unit,
220                                                     A attachment,
221                                                     CompletionHandler<V,? super A> handler);
222
223    @SuppressWarnings("unchecked")
224    private <V extends Number,A> Future<V> read(boolean isScatteringRead,
225                                                ByteBuffer dst,
226                                                ByteBuffer[] dsts,
227                                                long timeout,
228                                                TimeUnit unit,
229                                                A att,
230                                                CompletionHandler<V,? super A> handler)
231    {
232        if (!isOpen()) {
233            Throwable e = new ClosedChannelException();
234            if (handler == null)
235                return CompletedFuture.withFailure(e);
236            Invoker.invoke(this, handler, att, null, e);
237            return null;
238        }
239
240        if (remoteAddress == null)
241            throw new NotYetConnectedException();
242
243        boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
244        boolean shutdown = false;
245
246        // check and update state
247        synchronized (readLock) {
248            if (readKilled)
249                throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
250            if (reading)
251                throw new ReadPendingException();
252            if (readShutdown) {
253                shutdown = true;
254            } else {
255                if (hasSpaceToRead) {
256                    reading = true;
257                }
258            }
259        }
260
261        // immediately complete with -1 if shutdown for read
262        // immediately complete with 0 if no space remaining
263        if (shutdown || !hasSpaceToRead) {
264            Number result;
265            if (isScatteringRead) {
266                result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
267            } else {
268                result = (shutdown) ? -1 : 0;
269            }
270            if (handler == null)
271                return CompletedFuture.withResult((V)result);
272            Invoker.invoke(this, handler, att, (V)result, null);
273            return null;
274        }
275
276        return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
277    }
278
279    @Override
280    public final Future<Integer> read(ByteBuffer dst) {
281        if (dst.isReadOnly())
282            throw new IllegalArgumentException("Read-only buffer");
283        return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
284    }
285
286    @Override
287    public final <A> void read(ByteBuffer dst,
288                               long timeout,
289                               TimeUnit unit,
290                               A attachment,
291                               CompletionHandler<Integer,? super A> handler)
292    {
293        if (handler == null)
294            throw new NullPointerException("'handler' is null");
295        if (dst.isReadOnly())
296            throw new IllegalArgumentException("Read-only buffer");
297        read(false, dst, null, timeout, unit, attachment, handler);
298    }
299
300    @Override
301    public final <A> void read(ByteBuffer[] dsts,
302                               int offset,
303                               int length,
304                               long timeout,
305                               TimeUnit unit,
306                               A attachment,
307                               CompletionHandler<Long,? super A> handler)
308    {
309        if (handler == null)
310            throw new NullPointerException("'handler' is null");
311        if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
312            throw new IndexOutOfBoundsException();
313        ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
314        for (int i=0; i<bufs.length; i++) {
315            if (bufs[i].isReadOnly())
316                throw new IllegalArgumentException("Read-only buffer");
317        }
318        read(true, null, bufs, timeout, unit, attachment, handler);
319    }
320
321    /**
322     * Invoked by write to initiate the I/O operation.
323     */
324    abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
325                                                      ByteBuffer src,
326                                                      ByteBuffer[] srcs,
327                                                      long timeout,
328                                                      TimeUnit unit,
329                                                      A attachment,
330                                                      CompletionHandler<V,? super A> handler);
331
332    @SuppressWarnings("unchecked")
333    private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
334                                                 ByteBuffer src,
335                                                 ByteBuffer[] srcs,
336                                                 long timeout,
337                                                 TimeUnit unit,
338                                                 A att,
339                                                 CompletionHandler<V,? super A> handler)
340    {
341        boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
342
343        boolean closed = false;
344        if (isOpen()) {
345            if (remoteAddress == null)
346                throw new NotYetConnectedException();
347            // check and update state
348            synchronized (writeLock) {
349                if (writeKilled)
350                    throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
351                if (writing)
352                    throw new WritePendingException();
353                if (writeShutdown) {
354                    closed = true;
355                } else {
356                    if (hasDataToWrite)
357                        writing = true;
358                }
359            }
360        } else {
361            closed = true;
362        }
363
364        // channel is closed or shutdown for write
365        if (closed) {
366            Throwable e = new ClosedChannelException();
367            if (handler == null)
368                return CompletedFuture.withFailure(e);
369            Invoker.invoke(this, handler, att, null, e);
370            return null;
371        }
372
373        // nothing to write so complete immediately
374        if (!hasDataToWrite) {
375            Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
376            if (handler == null)
377                return CompletedFuture.withResult((V)result);
378            Invoker.invoke(this, handler, att, (V)result, null);
379            return null;
380        }
381
382        return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
383    }
384
385    @Override
386    public final Future<Integer> write(ByteBuffer src) {
387        return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
388    }
389
390    @Override
391    public final <A> void write(ByteBuffer src,
392                                long timeout,
393                                TimeUnit unit,
394                                A attachment,
395                                CompletionHandler<Integer,? super A> handler)
396    {
397        if (handler == null)
398            throw new NullPointerException("'handler' is null");
399        write(false, src, null, timeout, unit, attachment, handler);
400    }
401
402    @Override
403    public final <A> void  write(ByteBuffer[] srcs,
404                                 int offset,
405                                 int length,
406                                 long timeout,
407                                 TimeUnit unit,
408                                 A attachment,
409                                 CompletionHandler<Long,? super A> handler)
410    {
411        if (handler == null)
412            throw new NullPointerException("'handler' is null");
413        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
414            throw new IndexOutOfBoundsException();
415        srcs = Util.subsequence(srcs, offset, length);
416        write(true, null, srcs, timeout, unit, attachment, handler);
417    }
418
419    @Override
420    public final AsynchronousSocketChannel bind(SocketAddress local)
421        throws IOException
422    {
423        try {
424            begin();
425            synchronized (stateLock) {
426                if (state == ST_PENDING)
427                    throw new ConnectionPendingException();
428                if (localAddress != null)
429                    throw new AlreadyBoundException();
430                InetSocketAddress isa = (local == null) ?
431                    new InetSocketAddress(0) : Net.checkAddress(local);
432                SecurityManager sm = System.getSecurityManager();
433                if (sm != null) {
434                    sm.checkListen(isa.getPort());
435                }
436                NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
437                Net.bind(fd, isa.getAddress(), isa.getPort());
438                localAddress = Net.localAddress(fd);
439            }
440        } finally {
441            end();
442        }
443        return this;
444    }
445
446    @Override
447    public final SocketAddress getLocalAddress() throws IOException {
448        if (!isOpen())
449            throw new ClosedChannelException();
450         return Net.getRevealedLocalAddress(localAddress);
451    }
452
453    @Override
454    public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value)
455        throws IOException
456    {
457        if (name == null)
458            throw new NullPointerException();
459        if (!supportedOptions().contains(name))
460            throw new UnsupportedOperationException("'" + name + "' not supported");
461
462        try {
463            begin();
464            if (writeShutdown)
465                throw new IOException("Connection has been shutdown for writing");
466            if (name == StandardSocketOptions.SO_REUSEADDR &&
467                    Net.useExclusiveBind())
468            {
469                // SO_REUSEADDR emulated when using exclusive bind
470                isReuseAddress = (Boolean)value;
471            } else {
472                Net.setSocketOption(fd, Net.UNSPEC, name, value);
473            }
474            return this;
475        } finally {
476            end();
477        }
478    }
479
480    @Override
481    @SuppressWarnings("unchecked")
482    public final <T> T getOption(SocketOption<T> name) throws IOException {
483        if (name == null)
484            throw new NullPointerException();
485        if (!supportedOptions().contains(name))
486            throw new UnsupportedOperationException("'" + name + "' not supported");
487
488        try {
489            begin();
490            if (name == StandardSocketOptions.SO_REUSEADDR &&
491                    Net.useExclusiveBind())
492            {
493                // SO_REUSEADDR emulated when using exclusive bind
494                return (T)Boolean.valueOf(isReuseAddress);
495            }
496            return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
497        } finally {
498            end();
499        }
500    }
501
502    private static class DefaultOptionsHolder {
503        static final Set<SocketOption<?>> defaultOptions = defaultOptions();
504
505        private static Set<SocketOption<?>> defaultOptions() {
506            HashSet<SocketOption<?>> set = new HashSet<>(5);
507            set.add(StandardSocketOptions.SO_SNDBUF);
508            set.add(StandardSocketOptions.SO_RCVBUF);
509            set.add(StandardSocketOptions.SO_KEEPALIVE);
510            set.add(StandardSocketOptions.SO_REUSEADDR);
511            if (Net.isReusePortAvailable()) {
512                set.add(StandardSocketOptions.SO_REUSEPORT);
513            }
514            set.add(StandardSocketOptions.TCP_NODELAY);
515            ExtendedSocketOptions extendedOptions =
516                    ExtendedSocketOptions.getInstance();
517            set.addAll(extendedOptions.options());
518            return Collections.unmodifiableSet(set);
519        }
520    }
521
522    @Override
523    public final Set<SocketOption<?>> supportedOptions() {
524        return DefaultOptionsHolder.defaultOptions;
525    }
526
527    @Override
528    public final SocketAddress getRemoteAddress() throws IOException {
529        if (!isOpen())
530            throw new ClosedChannelException();
531        return remoteAddress;
532    }
533
534    @Override
535    public final AsynchronousSocketChannel shutdownInput() throws IOException {
536        try {
537            begin();
538            if (remoteAddress == null)
539                throw new NotYetConnectedException();
540            synchronized (readLock) {
541                if (!readShutdown) {
542                    Net.shutdown(fd, Net.SHUT_RD);
543                    readShutdown = true;
544                }
545            }
546        } finally {
547            end();
548        }
549        return this;
550    }
551
552    @Override
553    public final AsynchronousSocketChannel shutdownOutput() throws IOException {
554        try {
555            begin();
556            if (remoteAddress == null)
557                throw new NotYetConnectedException();
558            synchronized (writeLock) {
559                if (!writeShutdown) {
560                    Net.shutdown(fd, Net.SHUT_WR);
561                    writeShutdown = true;
562                }
563            }
564        } finally {
565            end();
566        }
567        return this;
568    }
569
570    @Override
571    public final String toString() {
572        StringBuilder sb = new StringBuilder();
573        sb.append(this.getClass().getName());
574        sb.append('[');
575        synchronized (stateLock) {
576            if (!isOpen()) {
577                sb.append("closed");
578            } else {
579                switch (state) {
580                case ST_UNCONNECTED:
581                    sb.append("unconnected");
582                    break;
583                case ST_PENDING:
584                    sb.append("connection-pending");
585                    break;
586                case ST_CONNECTED:
587                    sb.append("connected");
588                    if (readShutdown)
589                        sb.append(" ishut");
590                    if (writeShutdown)
591                        sb.append(" oshut");
592                    break;
593                }
594                if (localAddress != null) {
595                    sb.append(" local=");
596                    sb.append(
597                            Net.getRevealedLocalAddressAsString(localAddress));
598                }
599                if (remoteAddress != null) {
600                    sb.append(" remote=");
601                    sb.append(remoteAddress.toString());
602                }
603            }
604        }
605        sb.append(']');
606        return sb.toString();
607    }
608}
609