1/*
2 * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.xml.internal.ws.api.pipe;
27
28import com.sun.istack.internal.NotNull;
29import com.sun.istack.internal.Nullable;
30import com.sun.xml.internal.ws.api.Cancelable;
31import com.sun.xml.internal.ws.api.Component;
32import com.sun.xml.internal.ws.api.ComponentRegistry;
33import com.sun.xml.internal.ws.api.SOAPVersion;
34import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
35import com.sun.xml.internal.ws.api.message.AddressingUtils;
36import com.sun.xml.internal.ws.api.message.Packet;
37import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
38import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
39import com.sun.xml.internal.ws.api.server.Adapter;
40import com.sun.xml.internal.ws.api.server.Container;
41import com.sun.xml.internal.ws.api.server.ContainerResolver;
42
43import java.util.ArrayList;
44import java.util.List;
45import java.util.Set;
46import java.util.concurrent.CopyOnWriteArraySet;
47import java.util.concurrent.atomic.AtomicInteger;
48import java.util.concurrent.locks.Condition;
49import java.util.concurrent.locks.ReentrantLock;
50import java.util.logging.Level;
51import java.util.logging.Logger;
52
53import javax.xml.ws.Holder;
54import javax.xml.ws.WebServiceException;
55
56/**
57 * User-level thread. Represents the execution of one request/response processing.
58 * <p/>
59 * <p/>
60 * JAX-WS RI is capable of running a large number of request/response concurrently by
61 * using a relatively small number of threads. This is made possible by utilizing
62 * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
63 * processing.
64 * <p/>
65 * <p/>
66 * A fiber remembers where in the pipeline the processing is at, what needs to be
67 * executed on the way out (when processing response), and other additional information
68 * specific to the execution of a particular request/response.
69 * <p/>
70 * <h2>Suspend/Resume</h2>
71 * <p/>
72 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
73 * When a fiber is suspended, it will be kept on the side until it is
74 * {@link #resume(Packet) resumed}. This allows threads to go execute
75 * other runnable fibers, allowing efficient utilization of smaller number of
76 * threads.
77 * <p/>
78 * <h2>Context-switch Interception</h2>
79 * <p/>
80 * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
81 * to perform additional processing every time a thread starts running a fiber
82 * and stops running it.
83 * <p/>
84 * <h2>Context ClassLoader</h2>
85 * <p/>
86 * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
87 * becomes the thread's CCL when it's executing the fiber. The original CCL
88 * of the thread will be restored when the thread leaves the fiber execution.
89 * <p/>
90 * <p/>
91 * <h2>Debugging Aid</h2>
92 * <p/>
93 * Because {@link Fiber} doesn't keep much in the call stack, and instead use
94 * {@link #conts} to store the continuation, debugging fiber related activities
95 * could be harder.
96 * <p/>
97 * <p/>
98 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
99 * level logging. Using FINER would cause more detailed logging, which includes
100 * what tubes are executed in what order and how they behaved.
101 * <p/>
102 * <p/>
103 * When you debug the server side, consider setting {@link Fiber#serializeExecution}
104 * to true, so that execution of fibers are serialized. Debugging a server
105 * with more than one running threads is very tricky, and this switch will
106 * prevent that. This can be also enabled by setting the system property on.
107 * See the source code.
108 *
109 * @author Kohsuke Kawaguchi
110 * @author Jitendra Kotamraju
111 */
112public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
113
114        /**
115         * Callback interface for notification of suspend and resume.
116         *
117         * @since 2.2.6
118         * @deprecated Use {@link NextAction#suspend(Runnable)}
119         */
120    public interface Listener {
121        /**
122         * Fiber has been suspended.  Implementations of this callback may resume the Fiber.
123         * @param fiber Fiber
124         */
125        public void fiberSuspended(Fiber fiber);
126
127        /**
128         * Fiber has been resumed.  Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
129         * @param fiber Fiber
130         */
131        public void fiberResumed(Fiber fiber);
132    }
133
134    private final List<Listener> _listeners = new ArrayList<Listener>();
135
136    /**
137     * Adds suspend/resume callback listener
138     * @param listener Listener
139     * @since 2.2.6
140     * @deprecated
141     */
142    public void addListener(Listener listener) {
143        synchronized(_listeners) {
144            if (!_listeners.contains(listener)) {
145                _listeners.add(listener);
146            }
147        }
148    }
149
150    /**
151     * Removes suspend/resume callback listener
152     * @param listener Listener
153     * @since 2.2.6
154     * @deprecated
155     */
156    public void removeListener(Listener listener) {
157        synchronized(_listeners) {
158            _listeners.remove(listener);
159        }
160    }
161
162    List<Listener> getCurrentListeners() {
163      synchronized(_listeners) {
164         return new ArrayList<Listener>(_listeners);
165      }
166    }
167
168    private void clearListeners() {
169        synchronized(_listeners) {
170            _listeners.clear();
171        }
172    }
173
174    /**
175     * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
176     * to be invoked on the way back.
177     */
178    private Tube[] conts = new Tube[16];
179    private int contsSize;
180
181    /**
182     * If this field is non-null, the next instruction to execute is
183     * to call its {@link Tube#processRequest(Packet)}. Otherwise
184     * the instruction is to call {@link #conts}.
185     */
186    private Tube next;
187
188    private Packet packet;
189
190    private Throwable/*but really it's either RuntimeException or Error*/ throwable;
191
192    public final Engine owner;
193
194    /**
195     * Is this thread suspended? 0=not suspended, 1=suspended.
196     * <p/>
197     * <p/>
198     * Logically this is just a boolean, but we need to prepare for the case
199     * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
200     * This happens when things happen in the following order:
201     * <p/>
202     * <ol>
203     * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
204     * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
205     * <li>Tube returns with {@link NextAction#suspend()}.
206     * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
207     * to wake up fiber
208     * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
209     * </ol>
210     * <p/>
211     * <p/>
212     * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
213     * {@link #resume(Packet)} occurs before {@link #suspend()}.
214     * <p/>
215     * <p/>
216     * Increment and decrement is guarded by 'this' object.
217     */
218    private volatile int suspendedCount = 0;
219
220    private volatile boolean isInsideSuspendCallbacks = false;
221
222    /**
223     * Is this {@link Fiber} currently running in the synchronous mode?
224     */
225    private boolean synchronous;
226
227    private boolean interrupted;
228
229    private final int id;
230
231    /**
232     * Active {@link FiberContextSwitchInterceptor}s for this fiber.
233     */
234    private List<FiberContextSwitchInterceptor> interceptors;
235
236    /**
237     * Fiber's context {@link ClassLoader}.
238     */
239    private
240    @Nullable
241    ClassLoader contextClassLoader;
242
243    private
244    @Nullable
245    CompletionCallback completionCallback;
246
247    private boolean isDeliverThrowableInPacket = false;
248
249    public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
250        this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
251    }
252
253    /**
254     * The thread on which this Fiber is currently executing, if applicable.
255     */
256    private Thread currentThread;
257
258    /**
259     * Replace uses of synchronized(this) with this lock so that we can control
260     * unlocking for resume use cases
261     */
262    private final ReentrantLock lock = new ReentrantLock();
263    private final Condition condition = lock.newCondition();
264
265    private volatile boolean isCanceled;
266
267    /**
268     * Set to true if this fiber is started asynchronously, to avoid
269     * doubly-invoking completion code.
270     */
271    private boolean started;
272
273    /**
274     * Set to true if this fiber is started sync but allowed to run async.
275     * This property exists for use cases where the processing model is fundamentally async
276     * but some requirement or feature mandates that part of the tubeline run synchronously.  For
277     * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
278     * asynchronously, but if in-order message delivery is used then message processing must assign
279     * a message number before the remainder of the processing can be asynchronous.
280     */
281    private boolean startedSync;
282
283    /**
284     * Callback to be invoked when a {@link Fiber} finishes execution.
285     */
286    public interface CompletionCallback {
287        /**
288         * Indicates that the fiber has finished its execution.
289         * <p/>
290         * <p/>
291         * Since the JAX-WS RI runs asynchronously,
292         * this method maybe invoked by a different thread
293         * than any of the threads that started it or run a part of tubeline.
294         */
295        void onCompletion(@NotNull Packet response);
296
297        /**
298         * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
299         */
300        void onCompletion(@NotNull Throwable error);
301    }
302
303    Fiber(Engine engine) {
304        this.owner = engine;
305        id = iotaGen.incrementAndGet();
306        if (isTraceEnabled()) {
307            LOGGER.log(Level.FINE, "{0} created", getName());
308        }
309
310        // if this is run from another fiber, then we naturally inherit its context classloader,
311        // so this code works for fiber->fiber inheritance just fine.
312        contextClassLoader = Thread.currentThread().getContextClassLoader();
313    }
314
315    /**
316     * Starts the execution of this fiber asynchronously.
317     * <p/>
318     * <p/>
319     * This method works like {@link Thread#start()}.
320     *
321     * @param tubeline           The first tube of the tubeline that will act on the packet.
322     * @param request            The request packet to be passed to {@code startPoint.processRequest()}.
323     * @param completionCallback The callback to be invoked when the processing is finished and the
324     *                           final response packet is available.
325     * @see #runSync(Tube, Packet)
326     */
327    public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
328        start(tubeline, request, completionCallback, false);
329    }
330
331    private void dumpFiberContext(String desc) {
332        if(isTraceEnabled()) {
333            String action = null;
334            String msgId = null;
335            if (packet != null) {
336                for (SOAPVersion sv: SOAPVersion.values()) {
337                    for (AddressingVersion av: AddressingVersion.values()) {
338                        action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
339                        msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
340                        if (action != null || msgId != null) {
341                           break;
342                        }
343                    }
344                    if (action != null || msgId != null) {
345                        break;
346                    }
347                }
348            }
349            String actionAndMsgDesc;
350            if (action == null && msgId == null) {
351                actionAndMsgDesc = "NO ACTION or MSG ID";
352            } else {
353                actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
354            }
355
356            String tubeDesc;
357            if (next != null) {
358                tubeDesc = next.toString() + ".processRequest()";
359            } else {
360                tubeDesc = peekCont() + ".processResponse()";
361            }
362
363            LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
364        }
365    }
366
367    /**
368     * Starts the execution of this fiber.
369     *
370     * If forceSync is true, then the fiber is started for an ostensibly async invocation,
371     * but allows for some portion of the tubeline to run sync with the calling
372     * client instance (Port/Dispatch instance). This allows tubes that enforce
373     * ordering to see requests in the order they were sent at the point the
374     * client invoked them.
375     * <p>
376     * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
377     * SEIStub) knows one or more tubes need to enforce ordering and thus need
378     * to run sync with the client. Such tubes can return
379     * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
380     * should be invoked async to the current thread.
381     *
382     * <p>
383     * This method works like {@link Thread#start()}.
384     *
385     * @param tubeline
386     *      The first tube of the tubeline that will act on the packet.
387     * @param request
388     *      The request packet to be passed to {@code startPoint.processRequest()}.
389     * @param completionCallback
390     *      The callback to be invoked when the processing is finished and the
391     *      final response packet is available.
392     *
393     * @see #start(Tube,Packet,CompletionCallback)
394     * @see #runSync(Tube,Packet)
395     * @since 2.2.6
396     */
397    public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
398        next = tubeline;
399        this.packet = request;
400        this.completionCallback = completionCallback;
401
402        if (forceSync) {
403                this.startedSync = true;
404                dumpFiberContext("starting (sync)");
405                run();
406        } else {
407                this.started = true;
408                dumpFiberContext("starting (async)");
409                owner.addRunnable(this);
410        }
411    }
412
413    /**
414     * Wakes up a suspended fiber.
415     * <p/>
416     * <p/>
417     * If a fiber was suspended without specifying the next {@link Tube},
418     * then the execution will be resumed in the response processing direction,
419     * by calling the {@link Tube#processResponse(Packet)} method on the next/first
420     * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
421     * packet as the parameter.
422     * <p/>
423     * <p/>
424     * If a fiber was suspended with specifying the next {@link Tube},
425     * then the execution will be resumed in the request processing direction,
426     * by calling the next tube's {@link Tube#processRequest(Packet)} method with the
427     * specified resume packet as the parameter.
428     * <p/>
429     * <p/>
430     * This method is implemented in a race-free way. Another thread can invoke
431     * this method even before this fiber goes into the suspension mode. So the caller
432     * need not worry about synchronizing {@link NextAction#suspend()} and this method.
433     *
434     * @param resumePacket packet used in the resumed processing
435     */
436    public void resume(@NotNull Packet resumePacket) {
437        resume(resumePacket, false);
438    }
439
440    /**
441     * Similar to resume(Packet) but allowing the Fiber to be resumed
442     * synchronously (in the current Thread). If you want to know when the
443     * fiber completes (not when this method returns) then add/wrap a
444     * CompletionCallback on this Fiber.
445     * For example, an asynchronous response endpoint that supports WS-ReliableMessaging
446     * including in-order message delivery may need to resume the Fiber synchronously
447     * until message order is confirmed prior to returning to asynchronous processing.
448     * @since 2.2.6
449     */
450    public void resume(@NotNull Packet resumePacket,
451    boolean forceSync) {
452        resume(resumePacket, forceSync, null);
453    }
454
455    /**
456     * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
457     * and at the same time atomically assign a new CompletionCallback to it.
458     * @since 2.2.6
459     */
460    public void resume(@NotNull Packet resumePacket,
461                       boolean forceSync,
462                       CompletionCallback callback) {
463       lock.lock();
464       try {
465           if (callback != null) {
466             setCompletionCallback(callback);
467           }
468           if(isTraceEnabled())
469                LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
470                packet = resumePacket;
471                if( --suspendedCount == 0 ) {
472                   if (!isInsideSuspendCallbacks) {
473                        List<Listener> listeners = getCurrentListeners();
474                        for (Listener listener: listeners) {
475                            try {
476                                listener.fiberResumed(this);
477                            } catch (Throwable e) {
478                                if (isTraceEnabled())
479                                        LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
480                            }
481                        }
482
483                        if(synchronous) {
484                            condition.signalAll();
485                        } else if (forceSync || startedSync) {
486                            run();
487                        } else {
488                            dumpFiberContext("resuming (async)");
489                            owner.addRunnable(this);
490                        }
491                   }
492                } else {
493                    if (isTraceEnabled()) {
494                        LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
495                }
496            }
497       } finally {
498           lock.unlock();
499       }
500    }
501
502    /**
503     * Wakes up a suspended fiber and begins response processing.
504     * @since 2.2.6
505     */
506    public void resumeAndReturn(@NotNull Packet resumePacket,
507                                boolean forceSync) {
508        if(isTraceEnabled())
509            LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
510        next = null;
511        resume(resumePacket, forceSync);
512    }
513
514    /**
515     * Wakes up a suspended fiber with an exception.
516     * <p/>
517     * <p/>
518     * The execution of the suspended fiber will be resumed in the response
519     * processing direction, by calling the {@link Tube#processException(Throwable)} method
520     * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
521     * the specified exception as the parameter.
522     * <p/>
523     * <p/>
524     * This method is implemented in a race-free way. Another thread can invoke
525     * this method even before this fiber goes into the suspension mode. So the caller
526     * need not worry about synchronizing {@link NextAction#suspend()} and this method.
527     *
528     * @param throwable exception that is used in the resumed processing
529     */
530    public void resume(@NotNull Throwable throwable) {
531        resume(throwable, packet, false);
532    }
533
534    /**
535     * Wakes up a suspended fiber with an exception.
536     * <p/>
537     * <p/>
538     * The execution of the suspended fiber will be resumed in the response
539     * processing direction, by calling the {@link Tube#processException(Throwable)} method
540     * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
541     * the specified exception as the parameter.
542     * <p/>
543     * <p/>
544     * This method is implemented in a race-free way. Another thread can invoke
545     * this method even before this fiber goes into the suspension mode. So the caller
546     * need not worry about synchronizing {@link NextAction#suspend()} and this method.
547     *
548     * @param throwable exception that is used in the resumed processing
549     * @param packet Packet that will be visible on the Fiber after the resume
550     * @since 2.2.8
551     */
552    public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
553        resume(throwable, packet, false);
554    }
555
556    /**
557     * Wakes up a suspend fiber with an exception.
558     *
559     * If forceSync is true, then the suspended fiber will resume with
560     * synchronous processing on the current thread.  This will continue
561     * until some Tube indicates that it is safe to switch to asynchronous
562     * processing.
563     *
564     * @param error exception that is used in the resumed processing
565     * @param forceSync if processing begins synchronously
566     * @since 2.2.6
567     */
568    public void resume(@NotNull Throwable error,
569                       boolean forceSync) {
570        resume(error, packet, forceSync);
571    }
572
573    /**
574     * Wakes up a suspend fiber with an exception.
575     *
576     * If forceSync is true, then the suspended fiber will resume with
577     * synchronous processing on the current thread.  This will continue
578     * until some Tube indicates that it is safe to switch to asynchronous
579     * processing.
580     *
581     * @param error exception that is used in the resumed processing
582     * @param packet Packet that will be visible on the Fiber after the resume
583     * @param forceSync if processing begins synchronously
584     * @since 2.2.8
585     */
586    public void resume(@NotNull Throwable error,
587                       @NotNull Packet packet,
588                       boolean forceSync) {
589        if(isTraceEnabled())
590            LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
591        next = null;
592        throwable = error;
593        resume(packet, forceSync);
594    }
595
596    /**
597     * Marks this Fiber as cancelled.  A cancelled Fiber will never invoke its completion callback
598     * @param mayInterrupt if cancel should use {@link Thread#interrupt()}
599     * @see java.util.concurrent.Future#cancel(boolean)
600     * @since 2.2.6
601     */
602    @Override
603    public void cancel(boolean mayInterrupt) {
604        isCanceled = true;
605        if (mayInterrupt) {
606            // synchronized(this) is used as Thread running Fiber will be holding lock
607            synchronized(this) {
608                if (currentThread != null)
609                    currentThread.interrupt();
610            }
611        }
612    }
613
614    /**
615     * Suspends this fiber's execution until the resume method is invoked.
616     * <p/>
617     * The call returns immediately, and when the fiber is resumed
618     * the execution picks up from the last scheduled continuation.
619     * @param onExitRunnable runnable to be invoked after fiber is marked for suspension
620     * @return if control loop must exit
621     */
622    private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
623        if(isTraceEnabled()) {
624            LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
625            if (suspendedCount > 0) {
626                LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
627            }
628        }
629
630        List<Listener> listeners = getCurrentListeners();
631        if (++suspendedCount == 1) {
632            isInsideSuspendCallbacks = true;
633            try {
634                for (Listener listener: listeners) {
635                    try {
636                        listener.fiberSuspended(this);
637                    } catch (Throwable e) {
638                        if(isTraceEnabled())
639                            LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
640                    }
641                }
642            } finally {
643                isInsideSuspendCallbacks = false;
644            }
645        }
646
647        if (suspendedCount <= 0) {
648            // suspend callback caused fiber to resume
649            for (Listener listener: listeners) {
650                try {
651                    listener.fiberResumed(this);
652                } catch (Throwable e) {
653                        if(isTraceEnabled())
654                                LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
655                }
656            }
657
658        } else if (onExitRunnable != null) {
659            // synchronous use cases cannot disconnect from the current thread
660            if (!synchronous) {
661                /* INTENTIONALLY UNLOCKING EARLY */
662                synchronized(this) {
663                    // currentThread is protected by the monitor for this fiber so
664                    // that it is accessible to cancel() even when the lock is held
665                    currentThread = null;
666                }
667                lock.unlock();
668                assert(!lock.isHeldByCurrentThread());
669                isRequireUnlock.value = Boolean.FALSE;
670
671                try {
672                    onExitRunnable.run();
673                } catch(Throwable t) {
674                    throw new OnExitRunnableException(t);
675                }
676
677                return true;
678
679            } else {
680                // for synchronous we will stay with current thread, so do not disconnect
681                if (isTraceEnabled())
682                    LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
683                onExitRunnable.run();
684            }
685        }
686
687        return false;
688    }
689
690    private static final class OnExitRunnableException extends RuntimeException {
691        private static final long serialVersionUID = 1L;
692
693        Throwable target;
694
695        public OnExitRunnableException(Throwable target) {
696            super((Throwable)null); // see pattern for InvocationTargetException
697            this.target = target;
698        }
699    }
700
701    /**
702     * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
703     * <p/>
704     * <p/>
705     * The newly installed fiber will take effect immediately after the current
706     * tube returns from its {@link Tube#processRequest(Packet)} or
707     * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
708     * <p/>
709     * <p/>
710     * So when the tubeline consists of X and Y, and when X installs an interceptor,
711     * the order of execution will be as follows:
712     * <p/>
713     * <ol>
714     * <li>X.processRequest()
715     * <li>interceptor gets installed
716     * <li>interceptor.execute() is invoked
717     * <li>Y.processRequest()
718     * </ol>
719     */
720    public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
721        if (interceptors == null) {
722            interceptors = new ArrayList<FiberContextSwitchInterceptor>();
723        } else {
724            List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
725            l.addAll(interceptors);
726            interceptors = l;
727        }
728        interceptors.add(interceptor);
729    }
730
731    /**
732     * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
733     * <p/>
734     * <p/>
735     * The removal of the interceptor takes effect immediately after the current
736     * tube returns from its {@link Tube#processRequest(Packet)} or
737     * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
738     * <p/>
739     * <p/>
740     * <p/>
741     * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
742     * on the way out, then the order of execution will be as follows:
743     * <p/>
744     * <ol>
745     * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
746     * <li>interceptor gets uninstalled
747     * <li>interceptor.execute() returns
748     * <li>X.processResponse()
749     * </ol>
750     *
751     * @return true if the specified interceptor was removed. False if
752     *         the specified interceptor was not registered with this fiber to begin with.
753     */
754    public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
755        if (interceptors != null) {
756            boolean result = interceptors.remove(interceptor);
757            if (interceptors.isEmpty())
758                interceptors = null;
759            else {
760                List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
761                l.addAll(interceptors);
762                interceptors = l;
763            }
764            return result;
765        }
766        return false;
767    }
768
769    /**
770     * Gets the context {@link ClassLoader} of this fiber.
771     */
772    public
773    @Nullable
774    ClassLoader getContextClassLoader() {
775        return contextClassLoader;
776    }
777
778    /**
779     * Sets the context {@link ClassLoader} of this fiber.
780     */
781    public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
782        ClassLoader r = this.contextClassLoader;
783        this.contextClassLoader = contextClassLoader;
784        return r;
785    }
786
787    /**
788     * DO NOT CALL THIS METHOD. This is an implementation detail
789     * of {@link Fiber}.
790     */
791    @Deprecated
792    @Override
793    public void run() {
794        Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
795        try {
796            assert !synchronous;
797            // doRun returns true to indicate an early exit from fiber processing
798            if (!doRun()) {
799                if (startedSync && suspendedCount == 0 &&
800                    (next != null || contsSize > 0)) {
801                    // We bailed out of running this fiber we started as sync, and now
802                    // want to finish running it async
803                    startedSync = false;
804                    // Start back up as an async fiber
805                    dumpFiberContext("restarting (async) after startSync");
806                    owner.addRunnable(this);
807                } else {
808                    completionCheck();
809                }
810            }
811        } finally {
812            ContainerResolver.getDefault().exitContainer(old);
813        }
814    }
815
816    /**
817     * Runs a given {@link Tube} (and everything thereafter) synchronously.
818     * <p/>
819     * <p/>
820     * This method blocks and returns only when all the successive {@link Tube}s
821     * complete their request/response processing. This method can be used
822     * if a {@link Tube} needs to fallback to synchronous processing.
823     * <p/>
824     * <h3>Example:</h3>
825     * <pre>
826     * class FooTube extends {@link AbstractFilterTubeImpl} {
827     *   NextAction processRequest(Packet request) {
828     *     // run everything synchronously and return with the response packet
829     *     return doReturnWith(Fiber.current().runSync(next,request));
830     *   }
831     *   NextAction processResponse(Packet response) {
832     *     // never be invoked
833     *   }
834     * }
835     * </pre>
836     *
837     * @param tubeline The first tube of the tubeline that will act on the packet.
838     * @param request  The request packet to be passed to {@code startPoint.processRequest()}.
839     * @return The response packet to the {@code request}.
840     * @see #start(Tube, Packet, CompletionCallback)
841     */
842    public
843    @NotNull
844    Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
845        lock.lock();
846        try {
847            // save the current continuation, so that we return runSync() without executing them.
848            final Tube[] oldCont = conts;
849            final int oldContSize = contsSize;
850            final boolean oldSynchronous = synchronous;
851            final Tube oldNext = next;
852
853            if (oldContSize > 0) {
854                conts = new Tube[16];
855                contsSize = 0;
856            }
857
858            try {
859                synchronous = true;
860                this.packet = request;
861                next = tubeline;
862                doRun();
863                if (throwable != null) {
864                    if (isDeliverThrowableInPacket) {
865                        packet.addSatellite(new ThrowableContainerPropertySet(throwable));
866                    } else {
867                        if (throwable instanceof RuntimeException) {
868                            throw (RuntimeException) throwable;
869                        }
870                        if (throwable instanceof Error) {
871                            throw (Error) throwable;
872                        }
873                        // our system is supposed to only accept Error or RuntimeException
874                        throw new AssertionError(throwable);
875                    }
876                }
877                return this.packet;
878            } finally {
879                conts = oldCont;
880                contsSize = oldContSize;
881                synchronous = oldSynchronous;
882                next = oldNext;
883                if(interrupted) {
884                    Thread.currentThread().interrupt();
885                    interrupted = false;
886                }
887                if(!started && !startedSync)
888                    completionCheck();
889            }
890        } finally {
891            lock.unlock();
892        }
893    }
894
895    private void completionCheck() {
896        lock.lock();
897        try {
898            // Don't trigger completion and callbacks if fiber is suspended
899            if(!isCanceled && contsSize==0 && suspendedCount == 0) {
900                if(isTraceEnabled())
901                    LOGGER.log(Level.FINE, "{0} completed", getName());
902                clearListeners();
903                condition.signalAll();
904                if (completionCallback != null) {
905                    if (throwable != null) {
906                        if (isDeliverThrowableInPacket) {
907                            packet.addSatellite(new ThrowableContainerPropertySet(throwable));
908                            completionCallback.onCompletion(packet);
909                        } else
910                            completionCallback.onCompletion(throwable);
911                    } else
912                        completionCallback.onCompletion(packet);
913                }
914            }
915        } finally {
916            lock.unlock();
917        }
918    }
919
920    /**
921     * Invokes all registered {@link InterceptorHandler}s and then call into
922     * {@link Fiber#__doRun()}.
923     */
924    private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
925        private final Holder<Boolean> isUnlockRequired;
926        private final List<FiberContextSwitchInterceptor> ints;
927
928        /**
929         * Index in {@link Fiber#interceptors} to invoke next.
930         */
931        private int idx;
932
933        public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
934            this.isUnlockRequired = isUnlockRequired;
935            this.ints = ints;
936        }
937
938        /**
939         * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
940         */
941        Tube invoke(Tube next) {
942            idx = 0;
943            return execute(next);
944        }
945
946        @Override
947        public Tube execute(Tube next) {
948            if (idx == ints.size()) {
949                Fiber.this.next = next;
950                if (__doRun(isUnlockRequired, ints))
951                    return PLACEHOLDER;
952            } else {
953                FiberContextSwitchInterceptor interceptor = ints.get(idx++);
954                return interceptor.execute(Fiber.this, next, this);
955            }
956            return Fiber.this.next;
957        }
958    }
959
960    private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
961
962    private static class PlaceholderTube extends AbstractTubeImpl {
963
964        @Override
965        public NextAction processRequest(Packet request) {
966            throw new UnsupportedOperationException();
967        }
968
969        @Override
970        public NextAction processResponse(Packet response) {
971            throw new UnsupportedOperationException();
972        }
973
974        @Override
975        public NextAction processException(Throwable t) {
976            return doThrow(t);
977        }
978
979        @Override
980        public void preDestroy() {
981        }
982
983        @Override
984        public PlaceholderTube copy(TubeCloner cloner) {
985            throw new UnsupportedOperationException();
986        }
987    }
988
989    /**
990     * Executes the fiber as much as possible.
991     *
992     */
993    private boolean doRun() {
994        dumpFiberContext("running");
995
996        if (serializeExecution) {
997            serializedExecutionLock.lock();
998            try {
999                return _doRun(next);
1000            } finally {
1001                serializedExecutionLock.unlock();
1002            }
1003        } else {
1004            return _doRun(next);
1005        }
1006    }
1007
1008    private boolean _doRun(Tube next) {
1009        // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
1010        Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
1011        lock.lock();
1012        try {
1013            List<FiberContextSwitchInterceptor> ints;
1014            ClassLoader old;
1015            synchronized(this) {
1016                ints = interceptors;
1017
1018                // currentThread is protected by the monitor for this fiber so
1019                // that it is accessible to cancel() even when the lock is held
1020                currentThread = Thread.currentThread();
1021                if (isTraceEnabled()) {
1022                    LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
1023                }
1024
1025                old = currentThread.getContextClassLoader();
1026                currentThread.setContextClassLoader(contextClassLoader);
1027            }
1028
1029            try {
1030                boolean needsToReenter;
1031                do {
1032                    // if interceptors are set, go through the interceptors.
1033                    if (ints == null) {
1034                        this.next = next;
1035                        if (__doRun(isRequireUnlock, null /*ints*/)) {
1036                            return true;
1037                        }
1038                    } else {
1039                        next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
1040                        if (next == PLACEHOLDER) {
1041                            return true;
1042                        }
1043                    }
1044
1045                    synchronized(this) {
1046                        needsToReenter = (ints != interceptors);
1047                        if (needsToReenter)
1048                            ints = interceptors;
1049                    }
1050                } while (needsToReenter);
1051            } catch(OnExitRunnableException o) {
1052                // catching this exception indicates onExitRunnable in suspend() threw.
1053                // we must still avoid double unlock
1054                Throwable t = o.target;
1055                if (t instanceof WebServiceException)
1056                    throw (WebServiceException) t;
1057                throw new WebServiceException(t);
1058            } finally {
1059                // don't reference currentThread here because fiber processing
1060                // may already be running on a different thread (Note: isAlreadyExited
1061                // tracks this state
1062                Thread thread = Thread.currentThread();
1063                thread.setContextClassLoader(old);
1064                if (isTraceEnabled()) {
1065                    LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
1066                }
1067            }
1068
1069            return false;
1070        } finally {
1071            if (isRequireUnlock.value) {
1072                synchronized(this) {
1073                    currentThread = null;
1074                }
1075                lock.unlock();
1076            }
1077        }
1078    }
1079
1080    /**
1081     * To be invoked from {@link #doRun()}.
1082     *
1083     * @see #doRun()
1084     */
1085    private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
1086        assert(lock.isHeldByCurrentThread());
1087
1088        final Fiber old = CURRENT_FIBER.get();
1089        CURRENT_FIBER.set(this);
1090
1091        // if true, lots of debug messages to show what's being executed
1092        final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
1093
1094        try {
1095            boolean abortResponse = false;
1096            while(isReady(originalInterceptors)) {
1097                if (isCanceled) {
1098                    next = null;
1099                    throwable = null;
1100                    contsSize = 0;
1101                    break;
1102                }
1103
1104                try {
1105                    NextAction na;
1106                    Tube last;
1107                    if(throwable!=null) {
1108                        if(contsSize==0 || abortResponse) {
1109                            contsSize = 0; // abortResponse case
1110                            // nothing else to execute. we are done.
1111                            return false;
1112                        }
1113                        last = popCont();
1114                        if (traceEnabled)
1115                            LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
1116                        na = last.processException(throwable);
1117                    } else {
1118                        if(next!=null) {
1119                            if(traceEnabled)
1120                                LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
1121                            na = next.processRequest(packet);
1122                            last = next;
1123                        } else {
1124                            if(contsSize==0 || abortResponse) {
1125                                // nothing else to execute. we are done.
1126                                contsSize = 0;
1127                                return false;
1128                            }
1129                            last = popCont();
1130                            if(traceEnabled)
1131                                LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
1132                            na = last.processResponse(packet);
1133                        }
1134                    }
1135
1136                    if (traceEnabled)
1137                        LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
1138
1139                    // If resume is called before suspend, then make sure
1140                    // resume(Packet) is not lost
1141                    if (na.kind != NextAction.SUSPEND) {
1142                        // preserve in-flight packet so that processException may inspect
1143                        if (na.kind != NextAction.THROW &&
1144                          na.kind != NextAction.THROW_ABORT_RESPONSE)
1145                                packet = na.packet;
1146                        throwable = na.throwable;
1147                    }
1148
1149                    switch(na.kind) {
1150                    case NextAction.INVOKE:
1151                    case NextAction.INVOKE_ASYNC:
1152                        pushCont(last);
1153                        // fall through next
1154                    case NextAction.INVOKE_AND_FORGET:
1155                        next = na.next;
1156                        if (na.kind == NextAction.INVOKE_ASYNC
1157                            && startedSync) {
1158                          // Break out here
1159                          return false;
1160                        }
1161                        break;
1162                    case NextAction.THROW_ABORT_RESPONSE:
1163                    case NextAction.ABORT_RESPONSE:
1164                        abortResponse = true;
1165                        if (isTraceEnabled()) {
1166                          LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
1167                        }
1168                    case NextAction.RETURN:
1169                    case NextAction.THROW:
1170                        next = null;
1171                        break;
1172                    case NextAction.SUSPEND:
1173                        if (next != null) {
1174                          // Only store the 'last' tube when we're processing
1175                          // a request, since conts array is for processResponse
1176                          pushCont(last);
1177                        }
1178                        next = na.next;
1179                        if(suspend(isRequireUnlock, na.onExitRunnable))
1180                            return true; // explicitly exiting control loop
1181                        break;
1182                    default:
1183                        throw new AssertionError();
1184                    }
1185                } catch (RuntimeException t) {
1186                    if (traceEnabled)
1187                        LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1188                    throwable = t;
1189                } catch (Error t) {
1190                    if (traceEnabled)
1191                        LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1192                    throwable = t;
1193                }
1194
1195                dumpFiberContext("After tube execution");
1196            }
1197
1198            // there's nothing we can execute right away.
1199            // we'll be back when this fiber is resumed.
1200
1201        } finally {
1202            CURRENT_FIBER.set(old);
1203        }
1204
1205        return false;
1206    }
1207
1208    private void pushCont(Tube tube) {
1209        conts[contsSize++] = tube;
1210
1211        // expand if needed
1212        int len = conts.length;
1213        if (contsSize == len) {
1214            Tube[] newBuf = new Tube[len * 2];
1215            System.arraycopy(conts, 0, newBuf, 0, len);
1216            conts = newBuf;
1217        }
1218    }
1219
1220    private Tube popCont() {
1221        return conts[--contsSize];
1222    }
1223
1224    private Tube peekCont() {
1225        int index = contsSize - 1;
1226        if (index >= 0 && index < conts.length) {
1227          return conts[index];
1228        } else {
1229          return null;
1230        }
1231    }
1232
1233    /**
1234     * Only to be used by Tubes that manipulate the Fiber to create alternate flows
1235     * @since 2.2.6
1236     */
1237    public void resetCont(Tube[] conts, int contsSize) {
1238        this.conts = conts;
1239        this.contsSize = contsSize;
1240    }
1241
1242    /**
1243     * Returns true if the fiber is ready to execute.
1244     */
1245    private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
1246        if (synchronous) {
1247            while (suspendedCount == 1)
1248                try {
1249                    if (isTraceEnabled()) {
1250                        LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
1251                    }
1252                    condition.await(); // the synchronized block is the whole runSync method.
1253                } catch (InterruptedException e) {
1254                    // remember that we are interrupted, but don't respond to it
1255                    // right away. This behavior is in line with what happens
1256                    // when you are actually running the whole thing synchronously.
1257                    interrupted = true;
1258                }
1259
1260            synchronized(this) {
1261                return interceptors == originalInterceptors;
1262            }
1263        }
1264        else {
1265            if (suspendedCount>0)
1266                return false;
1267            synchronized(this) {
1268                return interceptors == originalInterceptors;
1269            }
1270        }
1271    }
1272
1273    private String getName() {
1274        return "engine-" + owner.id + "fiber-" + id;
1275    }
1276
1277    @Override
1278    public String toString() {
1279        return getName();
1280    }
1281
1282    /**
1283     * Gets the current {@link Packet} associated with this fiber.
1284     * <p/>
1285     * <p/>
1286     * This method returns null if no packet has been associated with the fiber yet.
1287     */
1288    public
1289    @Nullable
1290    Packet getPacket() {
1291        return packet;
1292    }
1293
1294    /**
1295     * Returns completion callback associated with this Fiber
1296     * @return Completion callback
1297     * @since 2.2.6
1298     */
1299    public CompletionCallback getCompletionCallback() {
1300        return completionCallback;
1301    }
1302
1303    /**
1304     * Updates completion callback associated with this Fiber
1305     * @param completionCallback Completion callback
1306     * @since 2.2.6
1307     */
1308    public void setCompletionCallback(CompletionCallback completionCallback) {
1309        this.completionCallback = completionCallback;
1310    }
1311
1312    /**
1313     * (ADVANCED) Returns true if the current fiber is being executed synchronously.
1314     * <p/>
1315     * <p/>
1316     * Fiber may run synchronously for various reasons. Perhaps this is
1317     * on client side and application has invoked a synchronous method call.
1318     * Perhaps this is on server side and we have deployed on a synchronous
1319     * transport (like servlet.)
1320     * <p/>
1321     * <p/>
1322     * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
1323     * further invocations to {@link #runSync(Tube, Packet)} can be done
1324     * without degrading the performance.
1325     * <p/>
1326     * <p/>
1327     * So this value can be used as a further optimization hint for
1328     * advanced {@link Tube}s to choose the best strategy to invoke
1329     * the next {@link Tube}. For example, a tube may want to install
1330     * a {@link FiberContextSwitchInterceptor} if running async, yet
1331     * it might find it faster to do {@link #runSync(Tube, Packet)}
1332     * if it's already running synchronously.
1333     */
1334    public static boolean isSynchronous() {
1335        return current().synchronous;
1336    }
1337
1338    /**
1339     * Returns true if the current Fiber on the current thread was started
1340     * synchronously. Note, this is not strictly the same as being synchronous
1341     * because the assumption is that the Fiber will ultimately be dispatched
1342     * asynchronously, possibly have a completion callback associated with it, etc.
1343     * Note, the 'startedSync' flag is cleared once the current Fiber is
1344     * converted to running asynchronously.
1345     * @since 2.2.6
1346     */
1347    public boolean isStartedSync() {
1348        return startedSync;
1349    }
1350
1351    /**
1352     * Gets the current fiber that's running.
1353     * <p/>
1354     * <p/>
1355     * This works like {@link Thread#currentThread()}.
1356     * This method only works when invoked from {@link Tube}.
1357     */
1358    public static
1359    @NotNull
1360    @SuppressWarnings({"null", "ConstantConditions"})
1361    Fiber current() {
1362        Fiber fiber = CURRENT_FIBER.get();
1363        if (fiber == null)
1364            throw new IllegalStateException("Can be only used from fibers");
1365        return fiber;
1366    }
1367
1368    /**
1369     * Gets the current fiber that's running, if set.
1370     */
1371    public static Fiber getCurrentIfSet() {
1372        return CURRENT_FIBER.get();
1373    }
1374
1375    private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
1376
1377    /**
1378     * Used to allocate unique number for each fiber.
1379     */
1380    private static final AtomicInteger iotaGen = new AtomicInteger();
1381
1382    private static boolean isTraceEnabled() {
1383        return LOGGER.isLoggable(Level.FINE);
1384    }
1385
1386    private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
1387
1388
1389    private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
1390
1391    /**
1392     * Set this boolean to true to execute fibers sequentially one by one.
1393     * See class javadoc.
1394     */
1395    public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
1396
1397    private final Set<Component> components = new CopyOnWriteArraySet<Component>();
1398
1399    @Override
1400    public <S> S getSPI(Class<S> spiType) {
1401        for (Component c : components) {
1402            S spi = c.getSPI(spiType);
1403            if (spi != null) {
1404                return spi;
1405            }
1406        }
1407        return null;
1408    }
1409
1410    @Override
1411    public Set<Component> getComponents() {
1412        return components;
1413    }
1414}
1415