WorkQueueImpl.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.orbutil.threadpool;
27
28import java.util.LinkedList;
29
30import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
31import com.sun.corba.se.spi.orbutil.threadpool.Work;
32import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
33
34import com.sun.corba.se.impl.orbutil.ORBConstants;
35import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl;
36
37import com.sun.corba.se.spi.monitoring.MonitoringConstants;
38import com.sun.corba.se.spi.monitoring.MonitoringFactories;
39import com.sun.corba.se.spi.monitoring.MonitoredObject;
40import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
41
42public class WorkQueueImpl implements WorkQueue
43{
44    private ThreadPool workerThreadPool;
45    private LinkedList theWorkQueue = new LinkedList();
46    private long workItemsAdded = 0;
47
48    // Initialized to 1 to avoid divide by zero in averageTimeInQueue()
49    private long workItemsDequeued = 1;
50
51    private long totalTimeInQueue = 0;
52
53    // Name of the work queue
54    private String name;
55
56    // MonitoredObject for work queue
57    private MonitoredObject workqueueMonitoredObject;
58
59    public WorkQueueImpl() {
60        name=ORBConstants.WORKQUEUE_DEFAULT_NAME;
61        initializeMonitoring();
62    }
63
64    public WorkQueueImpl(ThreadPool workerThreadPool) {
65        this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME);
66    }
67
68    public WorkQueueImpl(ThreadPool workerThreadPool, String name) {
69        this.workerThreadPool = workerThreadPool;
70        this.name = name;
71        initializeMonitoring();
72    }
73
74    // Setup monitoring for this workqueue
75    private void initializeMonitoring() {
76        workqueueMonitoredObject = MonitoringFactories.
77                            getMonitoredObjectFactory().
78                            createMonitoredObject(name,
79                            MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION);
80
81        LongMonitoredAttributeBase b1 = new
82            LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED,
83                    MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) {
84                public Object getValue() {
85                    return new Long(WorkQueueImpl.this.totalWorkItemsAdded());
86                }
87            };
88        workqueueMonitoredObject.addAttribute(b1);
89        LongMonitoredAttributeBase b2 = new
90            LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE,
91                    MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) {
92                public Object getValue() {
93                    return new Long(WorkQueueImpl.this.workItemsInQueue());
94                }
95            };
96        workqueueMonitoredObject.addAttribute(b2);
97        LongMonitoredAttributeBase b3 = new
98            LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE,
99                    MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) {
100                public Object getValue() {
101                    return new Long(WorkQueueImpl.this.averageTimeInQueue());
102                }
103            };
104        workqueueMonitoredObject.addAttribute(b3);
105    }
106
107
108    // Package private method to get the monitored object for this
109    // class
110    MonitoredObject getMonitoredObject() {
111        return workqueueMonitoredObject;
112    }
113
114    public synchronized void addWork(Work work) {
115            workItemsAdded++;
116            work.setEnqueueTime(System.currentTimeMillis());
117            theWorkQueue.addLast(work);
118            ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this);
119    }
120
121    synchronized Work requestWork(long waitTime) throws TimeoutException, InterruptedException
122    {
123        Work workItem;
124        ((ThreadPoolImpl)workerThreadPool).incrementNumberOfAvailableThreads();
125
126            if (theWorkQueue.size() != 0) {
127                workItem = (Work)theWorkQueue.removeFirst();
128                totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
129                workItemsDequeued++;
130                ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
131                return workItem;
132            }
133
134            try {
135
136                long remainingWaitTime = waitTime;
137                long finishTime = System.currentTimeMillis() + waitTime;
138
139                do {
140
141                    this.wait(remainingWaitTime);
142
143                    if (theWorkQueue.size() != 0) {
144                        workItem = (Work)theWorkQueue.removeFirst();
145                        totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
146                        workItemsDequeued++;
147                        ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
148                        return workItem;
149                    }
150
151                    remainingWaitTime = finishTime - System.currentTimeMillis();
152
153                } while (remainingWaitTime > 0);
154
155                ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
156                throw new TimeoutException();
157
158            } catch (InterruptedException ie) {
159                ((ThreadPoolImpl)workerThreadPool).decrementNumberOfAvailableThreads();
160                throw ie;
161            }
162    }
163
164    public void setThreadPool(ThreadPool workerThreadPool) {
165            this.workerThreadPool = workerThreadPool;
166    }
167
168    public ThreadPool getThreadPool() {
169            return workerThreadPool;
170    }
171
172    /**
173     * Returns the total number of Work items added to the Queue.
174     * This method is unsynchronized and only gives a snapshot of the
175     * state when it is called
176     */
177    public long totalWorkItemsAdded() {
178        return workItemsAdded;
179    }
180
181    /**
182     * Returns the total number of Work items in the Queue to be processed
183     * This method is unsynchronized and only gives a snapshot of the
184     * state when it is called
185     */
186    public int workItemsInQueue() {
187        return theWorkQueue.size();
188    }
189
190    public synchronized long averageTimeInQueue() {
191        return (totalTimeInQueue/workItemsDequeued);
192    }
193
194    public String getName() {
195        return name;
196    }
197}
198
199// End of file.
200