CondVar.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2001, 2002, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26/*
27  File: ConditionVariable.java
28
29  Originally written by Doug Lea and released into the public domain.
30  This may be used for any purposes whatsoever without acknowledgment.
31  Thanks for the assistance and support of Sun Microsystems Labs,
32  and everyone contributing, testing, and using this code.
33
34  History:
35  Date       Who                What
36  11Jun1998  dl               Create public version
37  08dec2001  kmc              Added support for Reentrant Mutexes
38*/
39
40package com.sun.corba.se.impl.orbutil.concurrent;
41
42import com.sun.corba.se.impl.orbutil.ORBUtility ;
43
44/**
45 * This class is designed for fans of POSIX pthreads programming.
46 * If you restrict yourself to Mutexes and CondVars, you can
47 * use most of your favorite constructions. Don't randomly mix them
48 * with synchronized methods or blocks though.
49 * <p>
50 * Method names and behavior are as close as is reasonable to
51 * those in POSIX.
52 * <p>
53 * <b>Sample Usage.</b> Here is a full version of a bounded buffer
54 * that implements the BoundedChannel interface, written in
55 * a style reminscent of that in POSIX programming books.
56 * <pre>
57 * class CVBuffer implements BoundedChannel {
58 *   private final Mutex mutex;
59 *   private final CondVar notFull;
60 *   private final CondVar notEmpty;
61 *   private int count = 0;
62 *   private int takePtr = 0;
63 *   private int putPtr = 0;
64 *   private final Object[] array;
65 *
66 *   public CVBuffer(int capacity) {
67 *     array = new Object[capacity];
68 *     mutex = new Mutex();
69 *     notFull = new CondVar(mutex);
70 *     notEmpty = new CondVar(mutex);
71 *   }
72 *
73 *   public int capacity() { return array.length; }
74 *
75 *   public void put(Object x) throws InterruptedException {
76 *     mutex.acquire();
77 *     try {
78 *       while (count == array.length) {
79 *         notFull.await();
80 *       }
81 *       array[putPtr] = x;
82 *       putPtr = (putPtr + 1) % array.length;
83 *       ++count;
84 *       notEmpty.signal();
85 *     }
86 *     finally {
87 *       mutex.release();
88 *     }
89 *   }
90 *
91 *   public Object take() throws InterruptedException {
92 *     Object x = null;
93 *     mutex.acquire();
94 *     try {
95 *       while (count == 0) {
96 *         notEmpty.await();
97 *       }
98 *       x = array[takePtr];
99 *       array[takePtr] = null;
100 *       takePtr = (takePtr + 1) % array.length;
101 *       --count;
102 *       notFull.signal();
103 *     }
104 *     finally {
105 *       mutex.release();
106 *     }
107 *     return x;
108 *   }
109 *
110 *   public boolean offer(Object x, long msecs) throws InterruptedException {
111 *     mutex.acquire();
112 *     try {
113 *       if (count == array.length) {
114 *         notFull.timedwait(msecs);
115 *         if (count == array.length)
116 *           return false;
117 *       }
118 *       array[putPtr] = x;
119 *       putPtr = (putPtr + 1) % array.length;
120 *       ++count;
121 *       notEmpty.signal();
122 *       return true;
123 *     }
124 *     finally {
125 *       mutex.release();
126 *     }
127 *   }
128 *
129 *   public Object poll(long msecs) throws InterruptedException {
130 *     Object x = null;
131 *     mutex.acquire();
132 *     try {
133 *       if (count == 0) {
134 *         notEmpty.timedwait(msecs);
135 *         if (count == 0)
136 *           return null;
137 *       }
138 *       x = array[takePtr];
139 *       array[takePtr] = null;
140 *       takePtr = (takePtr + 1) % array.length;
141 *       --count;
142 *       notFull.signal();
143 *     }
144 *     finally {
145 *       mutex.release();
146 *     }
147 *     return x;
148 *   }
149 * }
150 *
151 * </pre>
152 * @see Mutex
153 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
154
155 **/
156
157public class CondVar {
158
159    protected boolean debug_ ;
160
161    /** The mutex **/
162    protected final Sync mutex_;
163    protected final ReentrantMutex remutex_;
164
165    private int releaseMutex()
166    {
167        int count = 1 ;
168
169        if (remutex_!=null)
170            count = remutex_.releaseAll() ;
171        else
172            mutex_.release() ;
173
174        return count ;
175    }
176
177    private void acquireMutex( int count ) throws InterruptedException
178    {
179        if (remutex_!=null)
180            remutex_.acquireAll( count ) ;
181        else
182            mutex_.acquire() ;
183    }
184
185  /**
186   * Create a new CondVar that relies on the given mutual
187   * exclusion lock.
188   * @param mutex A mutual exclusion lock which must either be non-reentrant,
189   * or else be ReentrantMutex.
190   * Standard usage is to supply an instance of <code>Mutex</code>,
191   * but, for example, a Semaphore initialized to 1 also works.
192   * On the other hand, many other Sync implementations would not
193   * work here, so some care is required to supply a sensible
194   * synchronization object.
195   * In normal use, the mutex should be one that is used for <em>all</em>
196   * synchronization of the object using the CondVar. Generally,
197   * to prevent nested monitor lockouts, this
198   * object should not use any native Java synchronized blocks.
199   **/
200
201  public CondVar(Sync mutex, boolean debug) {
202    debug_ = debug ;
203    mutex_ = mutex;
204    if (mutex instanceof ReentrantMutex)
205        remutex_ = (ReentrantMutex)mutex;
206    else
207        remutex_ = null;
208  }
209
210  public CondVar( Sync mutex ) {
211      this( mutex, false ) ;
212  }
213
214  /**
215   * Wait for notification. This operation at least momentarily
216   * releases the mutex. The mutex is always held upon return,
217   * even if interrupted.
218   * @exception InterruptedException if the thread was interrupted
219   * before or during the wait. However, if the thread is interrupted
220   * after the wait but during mutex re-acquisition, the interruption
221   * is ignored, while still ensuring
222   * that the currentThread's interruption state stays true, so can
223   * be probed by callers.
224   **/
225    public void await() throws InterruptedException {
226        int count = 0 ;
227        if (Thread.interrupted())
228            throw new InterruptedException();
229
230        try {
231            if (debug_)
232                ORBUtility.dprintTrace( this, "await enter" ) ;
233
234            synchronized(this) {
235                count = releaseMutex() ;
236                try {
237                    wait();
238                } catch (InterruptedException ex) {
239                    notify();
240                    throw ex;
241                }
242            }
243        } finally {
244            // Must ignore interrupt on re-acquire
245            boolean interrupted = false;
246            for (;;) {
247                try {
248                    acquireMutex( count );
249                    break;
250                } catch (InterruptedException ex) {
251                    interrupted = true;
252                }
253            }
254
255            if (interrupted) {
256                Thread.currentThread().interrupt();
257            }
258
259            if (debug_)
260                ORBUtility.dprintTrace( this, "await exit" ) ;
261        }
262    }
263
264    /**
265    * Wait for at most msecs for notification.
266    * This operation at least momentarily
267    * releases the mutex. The mutex is always held upon return,
268    * even if interrupted.
269    * @param msecs The time to wait. A value less than or equal to zero
270    * causes a momentarily release
271    * and re-acquire of the mutex, and always returns false.
272    * @return false if at least msecs have elapsed
273    * upon resumption; else true. A
274    * false return does NOT necessarily imply that the thread was
275    * not notified. For example, it might have been notified
276    * after the time elapsed but just before resuming.
277    * @exception InterruptedException if the thread was interrupted
278    * before or during the wait.
279    **/
280
281    public boolean timedwait(long msecs) throws  InterruptedException {
282
283        if (Thread.interrupted())
284            throw new InterruptedException();
285
286        boolean success = false;
287        int count = 0;
288
289        try {
290            if (debug_)
291                ORBUtility.dprintTrace( this, "timedwait enter" ) ;
292
293            synchronized(this) {
294                count = releaseMutex() ;
295                try {
296                    if (msecs > 0) {
297                        long start = System.currentTimeMillis();
298                        wait(msecs);
299                        success = System.currentTimeMillis() - start <= msecs;
300                    }
301                } catch (InterruptedException ex) {
302                    notify();
303                    throw ex;
304                }
305            }
306        } finally {
307            // Must ignore interrupt on re-acquire
308            boolean interrupted = false;
309            for (;;) {
310                try {
311                    acquireMutex( count ) ;
312                    break;
313                } catch (InterruptedException ex) {
314                    interrupted = true;
315                }
316            }
317
318            if (interrupted) {
319                Thread.currentThread().interrupt();
320            }
321
322            if (debug_)
323                ORBUtility.dprintTrace( this, "timedwait exit" ) ;
324        }
325        return success;
326    }
327
328    /**
329    * Notify a waiting thread.
330    * If one exists, a non-interrupted thread will return
331    * normally (i.e., not via InterruptedException) from await or timedwait.
332    **/
333    public synchronized void signal() {
334        notify();
335    }
336
337    /** Notify all waiting threads **/
338    public synchronized void broadcast() {
339        notifyAll();
340    }
341}
342