1/*
2 * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.channels.*;
29import java.util.concurrent.*;
30import java.security.AccessController;
31import sun.security.action.GetIntegerAction;
32import jdk.internal.misc.InnocuousThread;
33
34/**
35 * Defines static methods to invoke a completion handler or arbitrary task.
36 */
37
38class Invoker {
39    private Invoker() { }
40
41    // maximum number of completion handlers that may be invoked on the current
42    // thread before it re-directs invocations to the thread pool. This helps
43    // avoid stack overflow and lessens the risk of starvation.
44    private static final int maxHandlerInvokeCount = AccessController.doPrivileged(
45        new GetIntegerAction("sun.nio.ch.maxCompletionHandlersOnStack", 16));
46
47    // Per-thread object with reference to channel group and a counter for
48    // the number of completion handlers invoked. This should be reset to 0
49    // when all completion handlers have completed.
50    static class GroupAndInvokeCount {
51        private final AsynchronousChannelGroupImpl group;
52        private int handlerInvokeCount;
53        GroupAndInvokeCount(AsynchronousChannelGroupImpl group) {
54            this.group = group;
55        }
56        AsynchronousChannelGroupImpl group() {
57            return group;
58        }
59        int invokeCount() {
60            return handlerInvokeCount;
61        }
62        void setInvokeCount(int value) {
63            handlerInvokeCount = value;
64        }
65        void resetInvokeCount() {
66            handlerInvokeCount = 0;
67        }
68        void incrementInvokeCount() {
69            handlerInvokeCount++;
70        }
71    }
72    private static final ThreadLocal<GroupAndInvokeCount> myGroupAndInvokeCount =
73        new ThreadLocal<GroupAndInvokeCount>() {
74            @Override protected GroupAndInvokeCount initialValue() {
75                return null;
76            }
77        };
78
79    /**
80     * Binds this thread to the given group
81     */
82    static void bindToGroup(AsynchronousChannelGroupImpl group) {
83        myGroupAndInvokeCount.set(new GroupAndInvokeCount(group));
84    }
85
86    /**
87     * Returns the GroupAndInvokeCount object for this thread.
88     */
89    static GroupAndInvokeCount getGroupAndInvokeCount() {
90        return myGroupAndInvokeCount.get();
91    }
92
93    /**
94     * Returns true if the current thread is in a channel group's thread pool
95     */
96    static boolean isBoundToAnyGroup() {
97        return myGroupAndInvokeCount.get() != null;
98    }
99
100    /**
101     * Returns true if the current thread is in the given channel's thread
102     * pool and we haven't exceeded the maximum number of handler frames on
103     * the stack.
104     */
105    static boolean mayInvokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
106                                   AsynchronousChannelGroupImpl group)
107    {
108        if ((myGroupAndInvokeCount != null) &&
109            (myGroupAndInvokeCount.group() == group) &&
110            (myGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
111        {
112            return true;
113        }
114        return false;
115    }
116
117    /**
118     * Invoke handler without checking the thread identity or number of handlers
119     * on the thread stack.
120     */
121    static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
122                                      A attachment,
123                                      V value,
124                                      Throwable exc)
125    {
126        if (exc == null) {
127            handler.completed(value, attachment);
128        } else {
129            handler.failed(exc, attachment);
130        }
131
132        // clear interrupt
133        Thread.interrupted();
134
135        // clear thread locals when in default thread pool
136        if (System.getSecurityManager() != null) {
137            Thread me = Thread.currentThread();
138            if (me instanceof InnocuousThread) {
139                GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
140                ((InnocuousThread)me).eraseThreadLocals();
141                if (thisGroupAndInvokeCount != null) {
142                    myGroupAndInvokeCount.set(thisGroupAndInvokeCount);
143                }
144            }
145        }
146    }
147
148    /**
149     * Invoke handler assuming thread identity already checked
150     */
151    static <V,A> void invokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
152                                   CompletionHandler<V,? super A> handler,
153                                   A attachment,
154                                   V result,
155                                   Throwable exc)
156    {
157        myGroupAndInvokeCount.incrementInvokeCount();
158        Invoker.invokeUnchecked(handler, attachment, result, exc);
159    }
160
161    /**
162     * Invokes the handler. If the current thread is in the channel group's
163     * thread pool then the handler is invoked directly, otherwise it is
164     * invoked indirectly.
165     */
166    static <V,A> void invoke(AsynchronousChannel channel,
167                             CompletionHandler<V,? super A> handler,
168                             A attachment,
169                             V result,
170                             Throwable exc)
171    {
172        boolean invokeDirect = false;
173        boolean identityOkay = false;
174        GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
175        if (thisGroupAndInvokeCount != null) {
176            if ((thisGroupAndInvokeCount.group() == ((Groupable)channel).group()))
177                identityOkay = true;
178            if (identityOkay &&
179                (thisGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
180            {
181                // group match
182                invokeDirect = true;
183            }
184        }
185        if (invokeDirect) {
186            invokeDirect(thisGroupAndInvokeCount, handler, attachment, result, exc);
187        } else {
188            try {
189                invokeIndirectly(channel, handler, attachment, result, exc);
190            } catch (RejectedExecutionException ree) {
191                // channel group shutdown; fallback to invoking directly
192                // if the current thread has the right identity.
193                if (identityOkay) {
194                    invokeDirect(thisGroupAndInvokeCount,
195                                 handler, attachment, result, exc);
196                } else {
197                    throw new ShutdownChannelGroupException();
198                }
199            }
200        }
201    }
202
203    /**
204     * Invokes the handler indirectly via the channel group's thread pool.
205     */
206    static <V,A> void invokeIndirectly(AsynchronousChannel channel,
207                                       final CompletionHandler<V,? super A> handler,
208                                       final A attachment,
209                                       final V result,
210                                       final Throwable exc)
211    {
212        try {
213            ((Groupable)channel).group().executeOnPooledThread(new Runnable() {
214                public void run() {
215                    GroupAndInvokeCount thisGroupAndInvokeCount =
216                        myGroupAndInvokeCount.get();
217                    if (thisGroupAndInvokeCount != null)
218                        thisGroupAndInvokeCount.setInvokeCount(1);
219                    invokeUnchecked(handler, attachment, result, exc);
220                }
221            });
222        } catch (RejectedExecutionException ree) {
223            throw new ShutdownChannelGroupException();
224        }
225    }
226
227    /**
228     * Invokes the handler "indirectly" in the given Executor
229     */
230    static <V,A> void invokeIndirectly(final CompletionHandler<V,? super A> handler,
231                                       final A attachment,
232                                       final V value,
233                                       final Throwable exc,
234                                       Executor executor)
235    {
236         try {
237            executor.execute(new Runnable() {
238                public void run() {
239                    invokeUnchecked(handler, attachment, value, exc);
240                }
241            });
242        } catch (RejectedExecutionException ree) {
243            throw new ShutdownChannelGroupException();
244        }
245    }
246
247    /**
248     * Invokes the given task on the thread pool associated with the given
249     * channel. If the current thread is in the thread pool then the task is
250     * invoked directly.
251     */
252    static void invokeOnThreadInThreadPool(Groupable channel,
253                                           Runnable task)
254    {
255        boolean invokeDirect;
256        GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
257        AsynchronousChannelGroupImpl targetGroup = channel.group();
258        if (thisGroupAndInvokeCount == null) {
259            invokeDirect = false;
260        } else {
261            invokeDirect = (thisGroupAndInvokeCount.group == targetGroup);
262        }
263        try {
264            if (invokeDirect) {
265                task.run();
266            } else {
267                targetGroup.executeOnPooledThread(task);
268            }
269        } catch (RejectedExecutionException ree) {
270            throw new ShutdownChannelGroupException();
271        }
272    }
273
274    /**
275     * Invoke handler with completed result. This method does not check the
276     * thread identity or the number of handlers on the thread stack.
277     */
278    static <V,A> void invokeUnchecked(PendingFuture<V,A> future) {
279        assert future.isDone();
280        CompletionHandler<V,? super A> handler = future.handler();
281        if (handler != null) {
282            invokeUnchecked(handler,
283                            future.attachment(),
284                            future.value(),
285                            future.exception());
286        }
287    }
288
289    /**
290     * Invoke handler with completed result. If the current thread is in the
291     * channel group's thread pool then the handler is invoked directly,
292     * otherwise it is invoked indirectly.
293     */
294    static <V,A> void invoke(PendingFuture<V,A> future) {
295        assert future.isDone();
296        CompletionHandler<V,? super A> handler = future.handler();
297        if (handler != null) {
298            invoke(future.channel(),
299                   handler,
300                   future.attachment(),
301                   future.value(),
302                   future.exception());
303        }
304    }
305
306    /**
307     * Invoke handler with completed result. The handler is invoked indirectly,
308     * via the channel group's thread pool.
309     */
310    static <V,A> void invokeIndirectly(PendingFuture<V,A> future) {
311        assert future.isDone();
312        CompletionHandler<V,? super A> handler = future.handler();
313        if (handler != null) {
314            invokeIndirectly(future.channel(),
315                             handler,
316                             future.attachment(),
317                             future.value(),
318                             future.exception());
319        }
320    }
321}
322