ThreadPoolImpl.java revision 667:d0315150c39d
1/*
2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.orbutil.threadpool;
27
28import java.io.IOException;
29import java.io.Closeable;
30
31import java.security.AccessController;
32import java.security.PrivilegedAction;
33
34import java.util.List;
35import java.util.ArrayList;
36
37import java.util.concurrent.atomic.AtomicInteger;
38import java.util.concurrent.atomic.AtomicLong;
39
40import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
41import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
42import com.sun.corba.se.spi.orbutil.threadpool.Work;
43import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
44
45import com.sun.corba.se.impl.orbutil.ORBConstants;
46import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;
47
48import com.sun.corba.se.spi.monitoring.MonitoringConstants;
49import com.sun.corba.se.spi.monitoring.MonitoredObject;
50import com.sun.corba.se.spi.monitoring.MonitoringFactories;
51import com.sun.corba.se.spi.orb.ORB;
52import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
53
54import com.sun.corba.se.impl.logging.ORBUtilSystemException;
55import com.sun.corba.se.impl.orbutil.ORBConstants;
56import com.sun.corba.se.spi.logging.CORBALogDomains;
57import com.sun.corba.se.impl.transport.ManagedLocalsThread;
58
59public class ThreadPoolImpl implements ThreadPool
60{
61    // serial counter useful for debugging
62    private static AtomicInteger threadCounter = new AtomicInteger(0);
63    private static final ORBUtilSystemException wrapper =
64        ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);
65
66
67    // Any time currentThreadCount and/or availableWorkerThreads is updated
68    // or accessed this ThreadPool's WorkQueue must be locked. And, it is
69    // expected that this ThreadPool's WorkQueue is the only object that
70    // updates and accesses these values directly and indirectly though a
71    // call to a method in this ThreadPool. If any call to update or access
72    // those values must synchronized on this ThreadPool's WorkQueue.
73    private WorkQueue workQueue;
74
75    // Stores the number of available worker threads
76    private int availableWorkerThreads = 0;
77
78    // Stores the number of threads in the threadpool currently
79    private int currentThreadCount = 0;
80
81    // Minimum number of worker threads created at instantiation of the threadpool
82    private int minWorkerThreads = 0;
83
84    // Maximum number of worker threads in the threadpool
85    private int maxWorkerThreads = 0;
86
87    // Inactivity timeout value for worker threads to exit and stop running
88    private long inactivityTimeout;
89
90    // Indicates if the threadpool is bounded or unbounded
91    private boolean boundedThreadPool = false;
92
93    // Running count of the work items processed
94    // Set the value to 1 so that divide by zero is avoided in
95    // averageWorkCompletionTime()
96    private AtomicLong processedCount = new AtomicLong(1);
97
98    // Running aggregate of the time taken in millis to execute work items
99    // processed by the threads in the threadpool
100    private AtomicLong totalTimeTaken = new AtomicLong(0);
101
102    // Name of the ThreadPool
103    private String name;
104
105    // MonitoredObject for ThreadPool
106    private MonitoredObject threadpoolMonitoredObject;
107
108    // ThreadGroup in which threads should be created
109    private ThreadGroup threadGroup;
110
111    Object workersLock = new Object();
112    List<WorkerThread> workers = new ArrayList<>();
113
114    /**
115     * This constructor is used to create an unbounded threadpool
116     */
117    public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
118        inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
119        maxWorkerThreads = Integer.MAX_VALUE;
120        workQueue = new WorkQueueImpl(this);
121        threadGroup = tg;
122        name = threadpoolName;
123        initializeMonitoring();
124    }
125
126    /**
127     * This constructor is used to create an unbounded threadpool
128     * in the ThreadGroup of the current thread
129     */
130    public ThreadPoolImpl(String threadpoolName) {
131        this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
132    }
133
134    /**
135     * This constructor is used to create bounded threadpool
136     */
137    public ThreadPoolImpl(int minSize, int maxSize, long timeout,
138                                            String threadpoolName)
139    {
140        minWorkerThreads = minSize;
141        maxWorkerThreads = maxSize;
142        inactivityTimeout = timeout;
143        boundedThreadPool = true;
144        workQueue = new WorkQueueImpl(this);
145        name = threadpoolName;
146        for (int i = 0; i < minWorkerThreads; i++) {
147            createWorkerThread();
148        }
149        initializeMonitoring();
150    }
151
152    // Note that this method should not return until AFTER all threads have died.
153    public void close() throws IOException {
154
155        // Copy to avoid concurrent modification problems.
156        List<WorkerThread> copy = null;
157        synchronized (workersLock) {
158            copy = new ArrayList<>(workers);
159        }
160
161        for (WorkerThread wt : copy) {
162            wt.close();
163            while (wt.getState() != Thread.State.TERMINATED) {
164                try {
165                    wt.join();
166                } catch (InterruptedException exc) {
167                    wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
168                }
169            }
170        }
171
172        threadGroup = null;
173    }
174
175
176    // Setup monitoring for this threadpool
177    private void initializeMonitoring() {
178        // Get root monitored object
179        MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
180                createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
181                getRootMonitoredObject();
182
183        // Create the threadpool monitoring root
184        MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
185                    MonitoringConstants.THREADPOOL_MONITORING_ROOT);
186        if (threadPoolMonitoringObjectRoot == null) {
187            threadPoolMonitoringObjectRoot =  MonitoringFactories.
188                    getMonitoredObjectFactory().createMonitoredObject(
189                    MonitoringConstants.THREADPOOL_MONITORING_ROOT,
190                    MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
191            root.addChild(threadPoolMonitoringObjectRoot);
192        }
193        threadpoolMonitoredObject = MonitoringFactories.
194                    getMonitoredObjectFactory().
195                    createMonitoredObject(name,
196                    MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);
197
198        threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);
199
200        LongMonitoredAttributeBase b1 = new
201            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
202                    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
203                public Object getValue() {
204                    return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
205                }
206            };
207        threadpoolMonitoredObject.addAttribute(b1);
208        LongMonitoredAttributeBase b2 = new
209            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
210                    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
211                public Object getValue() {
212                    return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
213                }
214            };
215        threadpoolMonitoredObject.addAttribute(b2);
216        LongMonitoredAttributeBase b3 = new
217            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
218                    MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
219                public Object getValue() {
220                    return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
221                }
222            };
223        threadpoolMonitoredObject.addAttribute(b3);
224        LongMonitoredAttributeBase b4 = new
225            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
226                    MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
227                public Object getValue() {
228                    return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
229                }
230            };
231        threadpoolMonitoredObject.addAttribute(b4);
232        LongMonitoredAttributeBase b5 = new
233            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
234                    MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
235                public Object getValue() {
236                    return new Long(ThreadPoolImpl.this.currentProcessedCount());
237                }
238            };
239        threadpoolMonitoredObject.addAttribute(b5);
240
241        // Add the monitored object for the WorkQueue
242
243        threadpoolMonitoredObject.addChild(
244                ((WorkQueueImpl)workQueue).getMonitoredObject());
245    }
246
247    // Package private method to get the monitored object for this
248    // class
249    MonitoredObject getMonitoredObject() {
250        return threadpoolMonitoredObject;
251    }
252
253    public WorkQueue getAnyWorkQueue()
254    {
255        return workQueue;
256    }
257
258    public WorkQueue getWorkQueue(int queueId)
259        throws NoSuchWorkQueueException
260    {
261        if (queueId != 0)
262            throw new NoSuchWorkQueueException();
263        return workQueue;
264    }
265
266    /**
267     * To be called from the workqueue when work is added to the
268     * workQueue. This method would create new threads if required
269     * or notify waiting threads on the queue for available work
270     */
271    void notifyForAvailableWork(WorkQueue aWorkQueue) {
272        synchronized (aWorkQueue) {
273            if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
274                createWorkerThread();
275            } else {
276                aWorkQueue.notify();
277            }
278        }
279    }
280
281
282    private Thread createWorkerThreadHelper( String name ) {
283        // Thread creation needs to be in a doPrivileged block
284        // if there is a non-null security manager for two reasons:
285        // 1. The creation of a thread in a specific ThreadGroup
286        //    is a privileged operation.  Lack of a doPrivileged
287        //    block here causes an AccessControlException
288        //    (see bug 6268145).
289        // 2. We want to make sure that the permissions associated
290        //    with this thread do NOT include the permissions of
291        //    the current thread that is calling this method.
292        //    This leads to problems in the app server where
293        //    some threads in the ThreadPool randomly get
294        //    bad permissions, leading to unpredictable
295        //    permission errors (see bug 6021011).
296        //
297        //    A Java thread contains a stack of call frames,
298        //    one for each method called that has not yet returned.
299        //    Each method comes from a particular class.  The class
300        //    was loaded by a ClassLoader which has an associated
301        //    CodeSource, and this determines the Permissions
302        //    for all methods in that class.  The current
303        //    Permissions for the thread are the intersection of
304        //    all Permissions for the methods on the stack.
305        //    This is part of the Security Context of the thread.
306        //
307        //    When a thread creates a new thread, the new thread
308        //    inherits the security context of the old thread.
309        //    This is bad in a ThreadPool, because different
310        //    creators of threads may have different security contexts.
311        //    This leads to occasional unpredictable errors when
312        //    a thread is re-used in a different security context.
313        //
314        //    Avoiding this problem is simple: just do the thread
315        //    creation in a doPrivileged block.  This sets the
316        //    inherited security context to that of the code source
317        //    for the ORB code itself, which contains all permissions
318        //    in either Java SE or Java EE.
319        WorkerThread thread = new WorkerThread(threadGroup, name);
320        synchronized (workersLock) {
321            workers.add(thread);
322        }
323
324        // The thread must be set to a daemon thread so the
325        // VM can exit if the only threads left are PooledThreads
326        // or other daemons.  We don't want to rely on the
327        // calling thread always being a daemon.
328        // Note that no exception is possible here since we
329        // are inside the doPrivileged block.
330        thread.setDaemon(true);
331
332        wrapper.workerThreadCreated(thread, thread.getContextClassLoader());
333
334        thread.start();
335        return null;
336    }
337
338
339    /**
340     * To be called from the workqueue to create worker threads when none
341     * available.
342     */
343    void createWorkerThread() {
344        final String name = getName();
345        synchronized (workQueue) {
346            try {
347                if (System.getSecurityManager() == null) {
348                    createWorkerThreadHelper(name);
349                } else {
350                    // If we get here, we need to create a thread.
351                    AccessController.doPrivileged(
352                            new PrivilegedAction() {
353                        public Object run() {
354                            return createWorkerThreadHelper(name);
355                        }
356                    }
357                    );
358                }
359            } catch (Throwable t) {
360                // Decrementing the count of current worker threads.
361                // But, it will be increased in the finally block.
362                decrementCurrentNumberOfThreads();
363                wrapper.workerThreadCreationFailure(t);
364            } finally {
365                incrementCurrentNumberOfThreads();
366            }
367        }
368    }
369
370    public int minimumNumberOfThreads() {
371        return minWorkerThreads;
372    }
373
374    public int maximumNumberOfThreads() {
375        return maxWorkerThreads;
376    }
377
378    public long idleTimeoutForThreads() {
379        return inactivityTimeout;
380    }
381
382    public int currentNumberOfThreads() {
383        synchronized (workQueue) {
384            return currentThreadCount;
385        }
386    }
387
388    void decrementCurrentNumberOfThreads() {
389        synchronized (workQueue) {
390            currentThreadCount--;
391        }
392    }
393
394    void incrementCurrentNumberOfThreads() {
395        synchronized (workQueue) {
396            currentThreadCount++;
397        }
398    }
399
400    public int numberOfAvailableThreads() {
401        synchronized (workQueue) {
402            return availableWorkerThreads;
403        }
404    }
405
406    public int numberOfBusyThreads() {
407        synchronized (workQueue) {
408            return (currentThreadCount - availableWorkerThreads);
409        }
410    }
411
412    public long averageWorkCompletionTime() {
413        synchronized (workQueue) {
414            return (totalTimeTaken.get() / processedCount.get());
415        }
416    }
417
418    public long currentProcessedCount() {
419        synchronized (workQueue) {
420            return processedCount.get();
421        }
422    }
423
424    public String getName() {
425        return name;
426    }
427
428    /**
429    * This method will return the number of WorkQueues serviced by the threadpool.
430    */
431    public int numberOfWorkQueues() {
432        return 1;
433    }
434
435
436    private static synchronized int getUniqueThreadId() {
437        return ThreadPoolImpl.threadCounter.incrementAndGet();
438    }
439
440    /**
441     * This method will decrement the number of available threads
442     * in the threadpool which are waiting for work. Called from
443     * WorkQueueImpl.requestWork()
444     */
445    void decrementNumberOfAvailableThreads() {
446        synchronized (workQueue) {
447            availableWorkerThreads--;
448        }
449    }
450
451    /**
452     * This method will increment the number of available threads
453     * in the threadpool which are waiting for work. Called from
454     * WorkQueueImpl.requestWork()
455     */
456    void incrementNumberOfAvailableThreads() {
457        synchronized (workQueue) {
458            availableWorkerThreads++;
459        }
460    }
461
462
463    private class WorkerThread extends ManagedLocalsThread implements Closeable
464    {
465        private Work currentWork;
466        private int threadId = 0; // unique id for the thread
467        private volatile boolean closeCalled = false;
468        private String threadPoolName;
469        // name seen by Thread.getName()
470        private StringBuffer workerThreadName = new StringBuffer();
471
472        WorkerThread(ThreadGroup tg, String threadPoolName) {
473            super(tg, "Idle");
474            this.threadId = ThreadPoolImpl.getUniqueThreadId();
475            this.threadPoolName = threadPoolName;
476            setName(composeWorkerThreadName(threadPoolName, "Idle"));
477        }
478
479        public synchronized void close() {
480            closeCalled = true;
481            interrupt();
482        }
483
484        private void resetClassLoader() {
485
486        }
487
488        private void performWork() {
489            long start = System.currentTimeMillis();
490            try {
491                currentWork.doWork();
492            } catch (Throwable t) {
493                wrapper.workerThreadDoWorkThrowable(this, t);
494            }
495            long elapsedTime = System.currentTimeMillis() - start;
496            totalTimeTaken.addAndGet(elapsedTime);
497            processedCount.incrementAndGet();
498        }
499
500        public void run() {
501            try  {
502                while (!closeCalled) {
503                    try {
504                        currentWork = ((WorkQueueImpl)workQueue).requestWork(
505                            inactivityTimeout);
506                        if (currentWork == null)
507                            continue;
508                    } catch (InterruptedException exc) {
509                        wrapper.workQueueThreadInterrupted( exc, getName(),
510                           Boolean.valueOf(closeCalled));
511
512                        continue ;
513                    } catch (Throwable t) {
514                         wrapper.workerThreadThrowableFromRequestWork(this, t,
515                                workQueue.getName());
516
517                        continue;
518                    }
519
520                    performWork();
521
522                    // set currentWork to null so that the work item can be
523                    // garbage collected without waiting for the next work item.
524                    currentWork = null;
525
526                    resetClassLoader();
527                }
528            } catch (Throwable e) {
529                // This should not be possible
530                wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
531            } finally {
532                synchronized (workersLock) {
533                    workers.remove(this);
534                }
535            }
536        }
537
538        private String composeWorkerThreadName(String poolName, String workerName) {
539            workerThreadName.setLength(0);
540            workerThreadName.append("p: ").append(poolName);
541            workerThreadName.append("; w: ").append(workerName);
542            return workerThreadName.toString();
543        }
544    } // End of WorkerThread class
545
546}
547
548// End of file.
549