1/*
2 * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.channels.Channel;
29import java.nio.channels.AsynchronousChannelGroup;
30import java.nio.channels.spi.AsynchronousChannelProvider;
31import java.io.IOException;
32import java.io.FileDescriptor;
33import java.util.Queue;
34import java.util.concurrent.*;
35import java.util.concurrent.atomic.AtomicInteger;
36import java.util.concurrent.atomic.AtomicBoolean;
37import java.security.PrivilegedAction;
38import java.security.AccessController;
39import java.security.AccessControlContext;
40import sun.security.action.GetIntegerAction;
41
42/**
43 * Base implementation of AsynchronousChannelGroup
44 */
45
46abstract class AsynchronousChannelGroupImpl
47    extends AsynchronousChannelGroup implements Executor
48{
49    // number of internal threads handling I/O events when using an unbounded
50    // thread pool. Internal threads do not dispatch to completion handlers.
51    private static final int internalThreadCount = AccessController.doPrivileged(
52        new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));
53
54    // associated thread pool
55    private final ThreadPool pool;
56
57    // number of tasks running (including internal)
58    private final AtomicInteger threadCount = new AtomicInteger();
59
60    // associated Executor for timeouts
61    private ScheduledThreadPoolExecutor timeoutExecutor;
62
63    // task queue for when using a fixed thread pool. In that case, thread
64    // waiting on I/O events must be awokon to poll tasks from this queue.
65    private final Queue<Runnable> taskQueue;
66
67    // group shutdown
68    private final AtomicBoolean shutdown = new AtomicBoolean();
69    private final Object shutdownNowLock = new Object();
70    private volatile boolean terminateInitiated;
71
72    AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
73                                 ThreadPool pool)
74    {
75        super(provider);
76        this.pool = pool;
77
78        if (pool.isFixedThreadPool()) {
79            taskQueue = new ConcurrentLinkedQueue<>();
80        } else {
81            taskQueue = null;   // not used
82        }
83
84        // use default thread factory as thread should not be visible to
85        // application (it doesn't execute completion handlers).
86        this.timeoutExecutor = (ScheduledThreadPoolExecutor)
87            Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
88        this.timeoutExecutor.setRemoveOnCancelPolicy(true);
89    }
90
91    final ExecutorService executor() {
92        return pool.executor();
93    }
94
95    final boolean isFixedThreadPool() {
96        return pool.isFixedThreadPool();
97    }
98
99    final int fixedThreadCount() {
100        if (isFixedThreadPool()) {
101            return pool.poolSize();
102        } else {
103            return pool.poolSize() + internalThreadCount;
104        }
105    }
106
107    private Runnable bindToGroup(final Runnable task) {
108        final AsynchronousChannelGroupImpl thisGroup = this;
109        return new Runnable() {
110            public void run() {
111                Invoker.bindToGroup(thisGroup);
112                task.run();
113            }
114        };
115    }
116
117    private void startInternalThread(final Runnable task) {
118        AccessController.doPrivileged(new PrivilegedAction<>() {
119            @Override
120            public Void run() {
121                // internal threads should not be visible to application so
122                // cannot use user-supplied thread factory
123                ThreadPool.defaultThreadFactory().newThread(task).start();
124                return null;
125            }
126         });
127    }
128
129    protected final void startThreads(Runnable task) {
130        if (!isFixedThreadPool()) {
131            for (int i=0; i<internalThreadCount; i++) {
132                startInternalThread(task);
133                threadCount.incrementAndGet();
134            }
135        }
136        if (pool.poolSize() > 0) {
137            task = bindToGroup(task);
138            try {
139                for (int i=0; i<pool.poolSize(); i++) {
140                    pool.executor().execute(task);
141                    threadCount.incrementAndGet();
142                }
143            } catch (RejectedExecutionException  x) {
144                // nothing we can do
145            }
146        }
147    }
148
149    final int threadCount() {
150        return threadCount.get();
151    }
152
153    /**
154     * Invoked by tasks as they terminate
155     */
156    final int threadExit(Runnable task, boolean replaceMe) {
157        if (replaceMe) {
158            try {
159                if (Invoker.isBoundToAnyGroup()) {
160                    // submit new task to replace this thread
161                    pool.executor().execute(bindToGroup(task));
162                } else {
163                    // replace internal thread
164                    startInternalThread(task);
165                }
166                return threadCount.get();
167            } catch (RejectedExecutionException x) {
168                // unable to replace
169            }
170        }
171        return threadCount.decrementAndGet();
172    }
173
174    /**
175     * Wakes up a thread waiting for I/O events to execute the given task.
176     */
177    abstract void executeOnHandlerTask(Runnable task);
178
179    /**
180     * For a fixed thread pool the task is queued to a thread waiting on I/O
181     * events. For other thread pools we simply submit the task to the thread
182     * pool.
183     */
184    final void executeOnPooledThread(Runnable task) {
185        if (isFixedThreadPool()) {
186            executeOnHandlerTask(task);
187        } else {
188            pool.executor().execute(bindToGroup(task));
189        }
190    }
191
192    final void offerTask(Runnable task) {
193        taskQueue.offer(task);
194    }
195
196    final Runnable pollTask() {
197        return (taskQueue == null) ? null : taskQueue.poll();
198    }
199
200    final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
201        try {
202            return timeoutExecutor.schedule(task, timeout, unit);
203        } catch (RejectedExecutionException rej) {
204            if (terminateInitiated) {
205                // no timeout scheduled as group is terminating
206                return null;
207            }
208            throw new AssertionError(rej);
209        }
210    }
211
212    @Override
213    public final boolean isShutdown() {
214        return shutdown.get();
215    }
216
217    @Override
218    public final boolean isTerminated()  {
219        return pool.executor().isTerminated();
220    }
221
222    /**
223     * Returns true if there are no channels in the group
224     */
225    abstract boolean isEmpty();
226
227    /**
228     * Attaches a foreign channel to this group.
229     */
230    abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
231        throws IOException;
232
233    /**
234     * Detaches a foreign channel from this group.
235     */
236    abstract void detachForeignChannel(Object key);
237
238    /**
239     * Closes all channels in the group
240     */
241    abstract void closeAllChannels() throws IOException;
242
243    /**
244     * Shutdown all tasks waiting for I/O events.
245     */
246    abstract void shutdownHandlerTasks();
247
248    private void shutdownExecutors() {
249        AccessController.doPrivileged(
250            new PrivilegedAction<>() {
251                public Void run() {
252                    pool.executor().shutdown();
253                    timeoutExecutor.shutdown();
254                    return null;
255                }
256            },
257            null,
258            new RuntimePermission("modifyThread"));
259    }
260
261    @Override
262    public final void shutdown() {
263        if (shutdown.getAndSet(true)) {
264            // already shutdown
265            return;
266        }
267        // if there are channels in the group then shutdown will continue
268        // when the last channel is closed
269        if (!isEmpty()) {
270            return;
271        }
272        // initiate termination (acquire shutdownNowLock to ensure that other
273        // threads invoking shutdownNow will block).
274        synchronized (shutdownNowLock) {
275            if (!terminateInitiated) {
276                terminateInitiated = true;
277                shutdownHandlerTasks();
278                shutdownExecutors();
279            }
280        }
281    }
282
283    @Override
284    public final void shutdownNow() throws IOException {
285        shutdown.set(true);
286        synchronized (shutdownNowLock) {
287            if (!terminateInitiated) {
288                terminateInitiated = true;
289                closeAllChannels();
290                shutdownHandlerTasks();
291                shutdownExecutors();
292            }
293        }
294    }
295
296    /**
297     * For use by AsynchronousFileChannel to release resources without shutting
298     * down the thread pool.
299     */
300    final void detachFromThreadPool() {
301        if (shutdown.getAndSet(true))
302            throw new AssertionError("Already shutdown");
303        if (!isEmpty())
304            throw new AssertionError("Group not empty");
305        shutdownHandlerTasks();
306    }
307
308    @Override
309    public final boolean awaitTermination(long timeout, TimeUnit unit)
310        throws InterruptedException
311    {
312        return pool.executor().awaitTermination(timeout, unit);
313    }
314
315    /**
316     * Executes the given command on one of the channel group's pooled threads.
317     */
318    @Override
319    public final void execute(Runnable task) {
320        SecurityManager sm = System.getSecurityManager();
321        if (sm != null) {
322            // when a security manager is installed then the user's task
323            // must be run with the current calling context
324            final AccessControlContext acc = AccessController.getContext();
325            final Runnable delegate = task;
326            task = new Runnable() {
327                @Override
328                public void run() {
329                    AccessController.doPrivileged(new PrivilegedAction<>() {
330                        @Override
331                        public Void run() {
332                            delegate.run();
333                            return null;
334                        }
335                    }, acc);
336                }
337            };
338        }
339        executeOnPooledThread(task);
340    }
341}
342