ThreadPoolImpl.java revision 724:e7ddf972e152
1/*
2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.orbutil.threadpool;
27
28import java.io.IOException;
29import java.io.Closeable;
30
31import java.security.AccessController;
32import java.security.PrivilegedAction;
33
34import java.util.List;
35import java.util.ArrayList;
36
37import java.util.concurrent.atomic.AtomicInteger;
38import java.util.concurrent.atomic.AtomicLong;
39
40import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
41import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
42import com.sun.corba.se.spi.orbutil.threadpool.Work;
43import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
44
45import com.sun.corba.se.impl.orbutil.ORBConstants;
46import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;
47
48import com.sun.corba.se.spi.monitoring.MonitoringConstants;
49import com.sun.corba.se.spi.monitoring.MonitoredObject;
50import com.sun.corba.se.spi.monitoring.MonitoringFactories;
51import com.sun.corba.se.spi.orb.ORB;
52import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
53
54import com.sun.corba.se.impl.logging.ORBUtilSystemException;
55import com.sun.corba.se.impl.orbutil.ORBConstants;
56import com.sun.corba.se.spi.logging.CORBALogDomains;
57
58public class ThreadPoolImpl implements ThreadPool
59{
60    // serial counter useful for debugging
61    private static AtomicInteger threadCounter = new AtomicInteger(0);
62    private static final ORBUtilSystemException wrapper =
63        ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);
64
65
66    // Any time currentThreadCount and/or availableWorkerThreads is updated
67    // or accessed this ThreadPool's WorkQueue must be locked. And, it is
68    // expected that this ThreadPool's WorkQueue is the only object that
69    // updates and accesses these values directly and indirectly though a
70    // call to a method in this ThreadPool. If any call to update or access
71    // those values must synchronized on this ThreadPool's WorkQueue.
72    private WorkQueue workQueue;
73
74    // Stores the number of available worker threads
75    private int availableWorkerThreads = 0;
76
77    // Stores the number of threads in the threadpool currently
78    private int currentThreadCount = 0;
79
80    // Minimum number of worker threads created at instantiation of the threadpool
81    private int minWorkerThreads = 0;
82
83    // Maximum number of worker threads in the threadpool
84    private int maxWorkerThreads = 0;
85
86    // Inactivity timeout value for worker threads to exit and stop running
87    private long inactivityTimeout;
88
89    // Indicates if the threadpool is bounded or unbounded
90    private boolean boundedThreadPool = false;
91
92    // Running count of the work items processed
93    // Set the value to 1 so that divide by zero is avoided in
94    // averageWorkCompletionTime()
95    private AtomicLong processedCount = new AtomicLong(1);
96
97    // Running aggregate of the time taken in millis to execute work items
98    // processed by the threads in the threadpool
99    private AtomicLong totalTimeTaken = new AtomicLong(0);
100
101    // Name of the ThreadPool
102    private String name;
103
104    // MonitoredObject for ThreadPool
105    private MonitoredObject threadpoolMonitoredObject;
106
107    // ThreadGroup in which threads should be created
108    private ThreadGroup threadGroup;
109
110    Object workersLock = new Object();
111    List<WorkerThread> workers = new ArrayList<>();
112
113    /**
114     * This constructor is used to create an unbounded threadpool
115     */
116    public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
117        inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
118        maxWorkerThreads = Integer.MAX_VALUE;
119        workQueue = new WorkQueueImpl(this);
120        threadGroup = tg;
121        name = threadpoolName;
122        initializeMonitoring();
123    }
124
125    /**
126     * This constructor is used to create an unbounded threadpool
127     * in the ThreadGroup of the current thread
128     */
129    public ThreadPoolImpl(String threadpoolName) {
130        this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
131    }
132
133    /**
134     * This constructor is used to create bounded threadpool
135     */
136    public ThreadPoolImpl(int minSize, int maxSize, long timeout,
137                                            String threadpoolName)
138    {
139        minWorkerThreads = minSize;
140        maxWorkerThreads = maxSize;
141        inactivityTimeout = timeout;
142        boundedThreadPool = true;
143        workQueue = new WorkQueueImpl(this);
144        name = threadpoolName;
145        for (int i = 0; i < minWorkerThreads; i++) {
146            createWorkerThread();
147        }
148        initializeMonitoring();
149    }
150
151    // Note that this method should not return until AFTER all threads have died.
152    public void close() throws IOException {
153
154        // Copy to avoid concurrent modification problems.
155        List<WorkerThread> copy = null;
156        synchronized (workersLock) {
157            copy = new ArrayList<>(workers);
158        }
159
160        for (WorkerThread wt : copy) {
161            wt.close();
162            while (wt.getState() != Thread.State.TERMINATED) {
163                try {
164                    wt.join();
165                } catch (InterruptedException exc) {
166                    wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
167                }
168            }
169        }
170
171        threadGroup = null;
172    }
173
174
175    // Setup monitoring for this threadpool
176    private void initializeMonitoring() {
177        // Get root monitored object
178        MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
179                createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
180                getRootMonitoredObject();
181
182        // Create the threadpool monitoring root
183        MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
184                    MonitoringConstants.THREADPOOL_MONITORING_ROOT);
185        if (threadPoolMonitoringObjectRoot == null) {
186            threadPoolMonitoringObjectRoot =  MonitoringFactories.
187                    getMonitoredObjectFactory().createMonitoredObject(
188                    MonitoringConstants.THREADPOOL_MONITORING_ROOT,
189                    MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
190            root.addChild(threadPoolMonitoringObjectRoot);
191        }
192        threadpoolMonitoredObject = MonitoringFactories.
193                    getMonitoredObjectFactory().
194                    createMonitoredObject(name,
195                    MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);
196
197        threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);
198
199        LongMonitoredAttributeBase b1 = new
200            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
201                    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
202                public Object getValue() {
203                    return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
204                }
205            };
206        threadpoolMonitoredObject.addAttribute(b1);
207        LongMonitoredAttributeBase b2 = new
208            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
209                    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
210                public Object getValue() {
211                    return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
212                }
213            };
214        threadpoolMonitoredObject.addAttribute(b2);
215        LongMonitoredAttributeBase b3 = new
216            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
217                    MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
218                public Object getValue() {
219                    return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
220                }
221            };
222        threadpoolMonitoredObject.addAttribute(b3);
223        LongMonitoredAttributeBase b4 = new
224            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
225                    MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
226                public Object getValue() {
227                    return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
228                }
229            };
230        threadpoolMonitoredObject.addAttribute(b4);
231        LongMonitoredAttributeBase b5 = new
232            LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
233                    MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
234                public Object getValue() {
235                    return new Long(ThreadPoolImpl.this.currentProcessedCount());
236                }
237            };
238        threadpoolMonitoredObject.addAttribute(b5);
239
240        // Add the monitored object for the WorkQueue
241
242        threadpoolMonitoredObject.addChild(
243                ((WorkQueueImpl)workQueue).getMonitoredObject());
244    }
245
246    // Package private method to get the monitored object for this
247    // class
248    MonitoredObject getMonitoredObject() {
249        return threadpoolMonitoredObject;
250    }
251
252    public WorkQueue getAnyWorkQueue()
253    {
254        return workQueue;
255    }
256
257    public WorkQueue getWorkQueue(int queueId)
258        throws NoSuchWorkQueueException
259    {
260        if (queueId != 0)
261            throw new NoSuchWorkQueueException();
262        return workQueue;
263    }
264
265    /**
266     * To be called from the workqueue when work is added to the
267     * workQueue. This method would create new threads if required
268     * or notify waiting threads on the queue for available work
269     */
270    void notifyForAvailableWork(WorkQueue aWorkQueue) {
271        synchronized (aWorkQueue) {
272            if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
273                createWorkerThread();
274            } else {
275                aWorkQueue.notify();
276            }
277        }
278    }
279
280
281    private Thread createWorkerThreadHelper( String name ) {
282        // Thread creation needs to be in a doPrivileged block
283        // if there is a non-null security manager for two reasons:
284        // 1. The creation of a thread in a specific ThreadGroup
285        //    is a privileged operation.  Lack of a doPrivileged
286        //    block here causes an AccessControlException
287        //    (see bug 6268145).
288        // 2. We want to make sure that the permissions associated
289        //    with this thread do NOT include the permissions of
290        //    the current thread that is calling this method.
291        //    This leads to problems in the app server where
292        //    some threads in the ThreadPool randomly get
293        //    bad permissions, leading to unpredictable
294        //    permission errors (see bug 6021011).
295        //
296        //    A Java thread contains a stack of call frames,
297        //    one for each method called that has not yet returned.
298        //    Each method comes from a particular class.  The class
299        //    was loaded by a ClassLoader which has an associated
300        //    CodeSource, and this determines the Permissions
301        //    for all methods in that class.  The current
302        //    Permissions for the thread are the intersection of
303        //    all Permissions for the methods on the stack.
304        //    This is part of the Security Context of the thread.
305        //
306        //    When a thread creates a new thread, the new thread
307        //    inherits the security context of the old thread.
308        //    This is bad in a ThreadPool, because different
309        //    creators of threads may have different security contexts.
310        //    This leads to occasional unpredictable errors when
311        //    a thread is re-used in a different security context.
312        //
313        //    Avoiding this problem is simple: just do the thread
314        //    creation in a doPrivileged block.  This sets the
315        //    inherited security context to that of the code source
316        //    for the ORB code itself, which contains all permissions
317        //    in either Java SE or Java EE.
318        WorkerThread thread = new WorkerThread(threadGroup, name);
319        synchronized (workersLock) {
320            workers.add(thread);
321        }
322
323        // The thread must be set to a daemon thread so the
324        // VM can exit if the only threads left are PooledThreads
325        // or other daemons.  We don't want to rely on the
326        // calling thread always being a daemon.
327        // Note that no exception is possible here since we
328        // are inside the doPrivileged block.
329        thread.setDaemon(true);
330
331        wrapper.workerThreadCreated(thread, thread.getContextClassLoader());
332
333        thread.start();
334        return null;
335    }
336
337
338    /**
339     * To be called from the workqueue to create worker threads when none
340     * available.
341     */
342    void createWorkerThread() {
343        final String name = getName();
344        synchronized (workQueue) {
345            try {
346                if (System.getSecurityManager() == null) {
347                    createWorkerThreadHelper(name);
348                } else {
349                    // If we get here, we need to create a thread.
350                    AccessController.doPrivileged(
351                            new PrivilegedAction() {
352                        public Object run() {
353                            return createWorkerThreadHelper(name);
354                        }
355                    }
356                    );
357                }
358            } catch (Throwable t) {
359                // Decrementing the count of current worker threads.
360                // But, it will be increased in the finally block.
361                decrementCurrentNumberOfThreads();
362                wrapper.workerThreadCreationFailure(t);
363            } finally {
364                incrementCurrentNumberOfThreads();
365            }
366        }
367    }
368
369    public int minimumNumberOfThreads() {
370        return minWorkerThreads;
371    }
372
373    public int maximumNumberOfThreads() {
374        return maxWorkerThreads;
375    }
376
377    public long idleTimeoutForThreads() {
378        return inactivityTimeout;
379    }
380
381    public int currentNumberOfThreads() {
382        synchronized (workQueue) {
383            return currentThreadCount;
384        }
385    }
386
387    void decrementCurrentNumberOfThreads() {
388        synchronized (workQueue) {
389            currentThreadCount--;
390        }
391    }
392
393    void incrementCurrentNumberOfThreads() {
394        synchronized (workQueue) {
395            currentThreadCount++;
396        }
397    }
398
399    public int numberOfAvailableThreads() {
400        synchronized (workQueue) {
401            return availableWorkerThreads;
402        }
403    }
404
405    public int numberOfBusyThreads() {
406        synchronized (workQueue) {
407            return (currentThreadCount - availableWorkerThreads);
408        }
409    }
410
411    public long averageWorkCompletionTime() {
412        synchronized (workQueue) {
413            return (totalTimeTaken.get() / processedCount.get());
414        }
415    }
416
417    public long currentProcessedCount() {
418        synchronized (workQueue) {
419            return processedCount.get();
420        }
421    }
422
423    public String getName() {
424        return name;
425    }
426
427    /**
428    * This method will return the number of WorkQueues serviced by the threadpool.
429    */
430    public int numberOfWorkQueues() {
431        return 1;
432    }
433
434
435    private static synchronized int getUniqueThreadId() {
436        return ThreadPoolImpl.threadCounter.incrementAndGet();
437    }
438
439    /**
440     * This method will decrement the number of available threads
441     * in the threadpool which are waiting for work. Called from
442     * WorkQueueImpl.requestWork()
443     */
444    void decrementNumberOfAvailableThreads() {
445        synchronized (workQueue) {
446            availableWorkerThreads--;
447        }
448    }
449
450    /**
451     * This method will increment the number of available threads
452     * in the threadpool which are waiting for work. Called from
453     * WorkQueueImpl.requestWork()
454     */
455    void incrementNumberOfAvailableThreads() {
456        synchronized (workQueue) {
457            availableWorkerThreads++;
458        }
459    }
460
461
462    private class WorkerThread extends sun.misc.ManagedLocalsThread implements Closeable
463    {
464        private Work currentWork;
465        private int threadId = 0; // unique id for the thread
466        private volatile boolean closeCalled = false;
467        private String threadPoolName;
468        // name seen by Thread.getName()
469        private StringBuffer workerThreadName = new StringBuffer();
470
471        WorkerThread(ThreadGroup tg, String threadPoolName) {
472            super(tg, "Idle");
473            this.threadId = ThreadPoolImpl.getUniqueThreadId();
474            this.threadPoolName = threadPoolName;
475            setName(composeWorkerThreadName(threadPoolName, "Idle"));
476        }
477
478        public synchronized void close() {
479            closeCalled = true;
480            interrupt();
481        }
482
483        private void resetClassLoader() {
484
485        }
486
487        private void performWork() {
488            long start = System.currentTimeMillis();
489            try {
490                currentWork.doWork();
491            } catch (Throwable t) {
492                wrapper.workerThreadDoWorkThrowable(this, t);
493            }
494            long elapsedTime = System.currentTimeMillis() - start;
495            totalTimeTaken.addAndGet(elapsedTime);
496            processedCount.incrementAndGet();
497        }
498
499        public void run() {
500            try  {
501                while (!closeCalled) {
502                    try {
503                        currentWork = ((WorkQueueImpl)workQueue).requestWork(
504                            inactivityTimeout);
505                        if (currentWork == null)
506                            continue;
507                    } catch (InterruptedException exc) {
508                        wrapper.workQueueThreadInterrupted( exc, getName(),
509                           Boolean.valueOf(closeCalled));
510
511                        continue ;
512                    } catch (Throwable t) {
513                         wrapper.workerThreadThrowableFromRequestWork(this, t,
514                                workQueue.getName());
515
516                        continue;
517                    }
518
519                    performWork();
520
521                    // set currentWork to null so that the work item can be
522                    // garbage collected without waiting for the next work item.
523                    currentWork = null;
524
525                    resetClassLoader();
526                }
527            } catch (Throwable e) {
528                // This should not be possible
529                wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
530            } finally {
531                synchronized (workersLock) {
532                    workers.remove(this);
533                }
534            }
535        }
536
537        private String composeWorkerThreadName(String poolName, String workerName) {
538            workerThreadName.setLength(0);
539            workerThreadName.append("p: ").append(poolName);
540            workerThreadName.append("; w: ").append(workerName);
541            return workerThreadName.toString();
542        }
543    } // End of WorkerThread class
544
545}
546
547// End of file.
548