CondVar.java revision 673:6b017d166ac2
1/*
2 * Copyright (c) 2001, 2002, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26/*
27  File: ConditionVariable.java
28
29  Originally written by Doug Lea and released into the public domain.
30  This may be used for any purposes whatsoever without acknowledgment.
31  Thanks for the assistance and support of Sun Microsystems Labs,
32  and everyone contributing, testing, and using this code.
33
34  History:
35  Date       Who                What
36  11Jun1998  dl               Create public version
37  08dec2001  kmc              Added support for Reentrant Mutexes
38*/
39
40package com.sun.corba.se.impl.orbutil.concurrent;
41
42import com.sun.corba.se.impl.orbutil.ORBUtility ;
43
44/**
45 * This class is designed for fans of POSIX pthreads programming.
46 * If you restrict yourself to Mutexes and CondVars, you can
47 * use most of your favorite constructions. Don't randomly mix them
48 * with synchronized methods or blocks though.
49 * <p>
50 * Method names and behavior are as close as is reasonable to
51 * those in POSIX.
52 * <p>
53 * <b>Sample Usage.</b> Here is a full version of a bounded buffer
54 * that implements the BoundedChannel interface, written in
55 * a style reminscent of that in POSIX programming books.
56 * <pre>
57 * class CVBuffer implements BoundedChannel {
58 *   private final Mutex mutex;
59 *   private final CondVar notFull;
60 *   private final CondVar notEmpty;
61 *   private int count = 0;
62 *   private int takePtr = 0;
63 *   private int putPtr = 0;
64 *   private final Object[] array;
65 *
66 *   public CVBuffer(int capacity) {
67 *     array = new Object[capacity];
68 *     mutex = new Mutex();
69 *     notFull = new CondVar(mutex);
70 *     notEmpty = new CondVar(mutex);
71 *   }
72 *
73 *   public int capacity() { return array.length; }
74 *
75 *   public void put(Object x) throws InterruptedException {
76 *     mutex.acquire();
77 *     try {
78 *       while (count == array.length) {
79 *         notFull.await();
80 *       }
81 *       array[putPtr] = x;
82 *       putPtr = (putPtr + 1) % array.length;
83 *       ++count;
84 *       notEmpty.signal();
85 *     }
86 *     finally {
87 *       mutex.release();
88 *     }
89 *   }
90 *
91 *   public Object take() throws InterruptedException {
92 *     Object x = null;
93 *     mutex.acquire();
94 *     try {
95 *       while (count == 0) {
96 *         notEmpty.await();
97 *       }
98 *       x = array[takePtr];
99 *       array[takePtr] = null;
100 *       takePtr = (takePtr + 1) % array.length;
101 *       --count;
102 *       notFull.signal();
103 *     }
104 *     finally {
105 *       mutex.release();
106 *     }
107 *     return x;
108 *   }
109 *
110 *   public boolean offer(Object x, long msecs) throws InterruptedException {
111 *     mutex.acquire();
112 *     try {
113 *       if (count == array.length) {
114 *         notFull.timedwait(msecs);
115 *         if (count == array.length)
116 *           return false;
117 *       }
118 *       array[putPtr] = x;
119 *       putPtr = (putPtr + 1) % array.length;
120 *       ++count;
121 *       notEmpty.signal();
122 *       return true;
123 *     }
124 *     finally {
125 *       mutex.release();
126 *     }
127 *   }
128 *
129 *   public Object poll(long msecs) throws InterruptedException {
130 *     Object x = null;
131 *     mutex.acquire();
132 *     try {
133 *       if (count == 0) {
134 *         notEmpty.timedwait(msecs);
135 *         if (count == 0)
136 *           return null;
137 *       }
138 *       x = array[takePtr];
139 *       array[takePtr] = null;
140 *       takePtr = (takePtr + 1) % array.length;
141 *       --count;
142 *       notFull.signal();
143 *     }
144 *     finally {
145 *       mutex.release();
146 *     }
147 *     return x;
148 *   }
149 * }
150 *
151 * </pre>
152 * @see Mutex
153 * [<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
154 **/
155
156public class CondVar {
157
158    protected boolean debug_ ;
159
160    /** The mutex **/
161    protected final Sync mutex_;
162    protected final ReentrantMutex remutex_;
163
164    private int releaseMutex()
165    {
166        int count = 1 ;
167
168        if (remutex_!=null)
169            count = remutex_.releaseAll() ;
170        else
171            mutex_.release() ;
172
173        return count ;
174    }
175
176    private void acquireMutex( int count ) throws InterruptedException
177    {
178        if (remutex_!=null)
179            remutex_.acquireAll( count ) ;
180        else
181            mutex_.acquire() ;
182    }
183
184  /**
185   * Create a new CondVar that relies on the given mutual
186   * exclusion lock.
187   * @param mutex A mutual exclusion lock which must either be non-reentrant,
188   * or else be ReentrantMutex.
189   * Standard usage is to supply an instance of <code>Mutex</code>,
190   * but, for example, a Semaphore initialized to 1 also works.
191   * On the other hand, many other Sync implementations would not
192   * work here, so some care is required to supply a sensible
193   * synchronization object.
194   * In normal use, the mutex should be one that is used for <em>all</em>
195   * synchronization of the object using the CondVar. Generally,
196   * to prevent nested monitor lockouts, this
197   * object should not use any native Java synchronized blocks.
198   **/
199
200  public CondVar(Sync mutex, boolean debug) {
201    debug_ = debug ;
202    mutex_ = mutex;
203    if (mutex instanceof ReentrantMutex)
204        remutex_ = (ReentrantMutex)mutex;
205    else
206        remutex_ = null;
207  }
208
209  public CondVar( Sync mutex ) {
210      this( mutex, false ) ;
211  }
212
213  /**
214   * Wait for notification. This operation at least momentarily
215   * releases the mutex. The mutex is always held upon return,
216   * even if interrupted.
217   * @exception InterruptedException if the thread was interrupted
218   * before or during the wait. However, if the thread is interrupted
219   * after the wait but during mutex re-acquisition, the interruption
220   * is ignored, while still ensuring
221   * that the currentThread's interruption state stays true, so can
222   * be probed by callers.
223   **/
224    public void await() throws InterruptedException {
225        int count = 0 ;
226        if (Thread.interrupted())
227            throw new InterruptedException();
228
229        try {
230            if (debug_)
231                ORBUtility.dprintTrace( this, "await enter" ) ;
232
233            synchronized(this) {
234                count = releaseMutex() ;
235                try {
236                    wait();
237                } catch (InterruptedException ex) {
238                    notify();
239                    throw ex;
240                }
241            }
242        } finally {
243            // Must ignore interrupt on re-acquire
244            boolean interrupted = false;
245            for (;;) {
246                try {
247                    acquireMutex( count );
248                    break;
249                } catch (InterruptedException ex) {
250                    interrupted = true;
251                }
252            }
253
254            if (interrupted) {
255                Thread.currentThread().interrupt();
256            }
257
258            if (debug_)
259                ORBUtility.dprintTrace( this, "await exit" ) ;
260        }
261    }
262
263    /**
264    * Wait for at most msecs for notification.
265    * This operation at least momentarily
266    * releases the mutex. The mutex is always held upon return,
267    * even if interrupted.
268    * @param msecs The time to wait. A value less than or equal to zero
269    * causes a momentarily release
270    * and re-acquire of the mutex, and always returns false.
271    * @return false if at least msecs have elapsed
272    * upon resumption; else true. A
273    * false return does NOT necessarily imply that the thread was
274    * not notified. For example, it might have been notified
275    * after the time elapsed but just before resuming.
276    * @exception InterruptedException if the thread was interrupted
277    * before or during the wait.
278    **/
279
280    public boolean timedwait(long msecs) throws  InterruptedException {
281
282        if (Thread.interrupted())
283            throw new InterruptedException();
284
285        boolean success = false;
286        int count = 0;
287
288        try {
289            if (debug_)
290                ORBUtility.dprintTrace( this, "timedwait enter" ) ;
291
292            synchronized(this) {
293                count = releaseMutex() ;
294                try {
295                    if (msecs > 0) {
296                        long start = System.currentTimeMillis();
297                        wait(msecs);
298                        success = System.currentTimeMillis() - start <= msecs;
299                    }
300                } catch (InterruptedException ex) {
301                    notify();
302                    throw ex;
303                }
304            }
305        } finally {
306            // Must ignore interrupt on re-acquire
307            boolean interrupted = false;
308            for (;;) {
309                try {
310                    acquireMutex( count ) ;
311                    break;
312                } catch (InterruptedException ex) {
313                    interrupted = true;
314                }
315            }
316
317            if (interrupted) {
318                Thread.currentThread().interrupt();
319            }
320
321            if (debug_)
322                ORBUtility.dprintTrace( this, "timedwait exit" ) ;
323        }
324        return success;
325    }
326
327    /**
328    * Notify a waiting thread.
329    * If one exists, a non-interrupted thread will return
330    * normally (i.e., not via InterruptedException) from await or timedwait.
331    **/
332    public synchronized void signal() {
333        notify();
334    }
335
336    /** Notify all waiting threads **/
337    public synchronized void broadcast() {
338        notifyAll();
339    }
340}
341