1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.lang.invoke.MethodHandles;
39import java.lang.invoke.VarHandle;
40import java.util.AbstractQueue;
41import java.util.Arrays;
42import java.util.Collection;
43import java.util.Comparator;
44import java.util.Iterator;
45import java.util.NoSuchElementException;
46import java.util.Objects;
47import java.util.PriorityQueue;
48import java.util.Queue;
49import java.util.SortedSet;
50import java.util.Spliterator;
51import java.util.concurrent.locks.Condition;
52import java.util.concurrent.locks.ReentrantLock;
53import java.util.function.Consumer;
54
55/**
56 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
57 * the same ordering rules as class {@link PriorityQueue} and supplies
58 * blocking retrieval operations.  While this queue is logically
59 * unbounded, attempted additions may fail due to resource exhaustion
60 * (causing {@code OutOfMemoryError}). This class does not permit
61 * {@code null} elements.  A priority queue relying on {@linkplain
62 * Comparable natural ordering} also does not permit insertion of
63 * non-comparable objects (doing so results in
64 * {@code ClassCastException}).
65 *
66 * <p>This class and its iterator implement all of the <em>optional</em>
67 * methods of the {@link Collection} and {@link Iterator} interfaces.
68 * The Iterator provided in method {@link #iterator()} and the
69 * Spliterator provided in method {@link #spliterator()} are <em>not</em>
70 * guaranteed to traverse the elements of the PriorityBlockingQueue in
71 * any particular order. If you need ordered traversal, consider using
72 * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo} can
73 * be used to <em>remove</em> some or all elements in priority order and
74 * place them in another collection.
75 *
76 * <p>Operations on this class make no guarantees about the ordering
77 * of elements with equal priority. If you need to enforce an
78 * ordering, you can define custom classes or comparators that use a
79 * secondary key to break ties in primary priority values.  For
80 * example, here is a class that applies first-in-first-out
81 * tie-breaking to comparable elements. To use it, you would insert a
82 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
83 *
84 * <pre> {@code
85 * class FIFOEntry<E extends Comparable<? super E>>
86 *     implements Comparable<FIFOEntry<E>> {
87 *   static final AtomicLong seq = new AtomicLong(0);
88 *   final long seqNum;
89 *   final E entry;
90 *   public FIFOEntry(E entry) {
91 *     seqNum = seq.getAndIncrement();
92 *     this.entry = entry;
93 *   }
94 *   public E getEntry() { return entry; }
95 *   public int compareTo(FIFOEntry<E> other) {
96 *     int res = entry.compareTo(other.entry);
97 *     if (res == 0 && other.entry != this.entry)
98 *       res = (seqNum < other.seqNum ? -1 : 1);
99 *     return res;
100 *   }
101 * }}</pre>
102 *
103 * <p>This class is a member of the
104 * <a href="{@docRoot}/java/util/package-summary.html#CollectionsFramework">
105 * Java Collections Framework</a>.
106 *
107 * @since 1.5
108 * @author Doug Lea
109 * @param <E> the type of elements held in this queue
110 */
111@SuppressWarnings("unchecked")
112public class PriorityBlockingQueue<E> extends AbstractQueue<E>
113    implements BlockingQueue<E>, java.io.Serializable {
114    private static final long serialVersionUID = 5595510919245408276L;
115
116    /*
117     * The implementation uses an array-based binary heap, with public
118     * operations protected with a single lock. However, allocation
119     * during resizing uses a simple spinlock (used only while not
120     * holding main lock) in order to allow takes to operate
121     * concurrently with allocation.  This avoids repeated
122     * postponement of waiting consumers and consequent element
123     * build-up. The need to back away from lock during allocation
124     * makes it impossible to simply wrap delegated
125     * java.util.PriorityQueue operations within a lock, as was done
126     * in a previous version of this class. To maintain
127     * interoperability, a plain PriorityQueue is still used during
128     * serialization, which maintains compatibility at the expense of
129     * transiently doubling overhead.
130     */
131
132    /**
133     * Default array capacity.
134     */
135    private static final int DEFAULT_INITIAL_CAPACITY = 11;
136
137    /**
138     * The maximum size of array to allocate.
139     * Some VMs reserve some header words in an array.
140     * Attempts to allocate larger arrays may result in
141     * OutOfMemoryError: Requested array size exceeds VM limit
142     */
143    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
144
145    /**
146     * Priority queue represented as a balanced binary heap: the two
147     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
148     * priority queue is ordered by comparator, or by the elements'
149     * natural ordering, if comparator is null: For each node n in the
150     * heap and each descendant d of n, n <= d.  The element with the
151     * lowest value is in queue[0], assuming the queue is nonempty.
152     */
153    private transient Object[] queue;
154
155    /**
156     * The number of elements in the priority queue.
157     */
158    private transient int size;
159
160    /**
161     * The comparator, or null if priority queue uses elements'
162     * natural ordering.
163     */
164    private transient Comparator<? super E> comparator;
165
166    /**
167     * Lock used for all public operations.
168     */
169    private final ReentrantLock lock;
170
171    /**
172     * Condition for blocking when empty.
173     */
174    private final Condition notEmpty;
175
176    /**
177     * Spinlock for allocation, acquired via CAS.
178     */
179    private transient volatile int allocationSpinLock;
180
181    /**
182     * A plain PriorityQueue used only for serialization,
183     * to maintain compatibility with previous versions
184     * of this class. Non-null only during serialization/deserialization.
185     */
186    private PriorityQueue<E> q;
187
188    /**
189     * Creates a {@code PriorityBlockingQueue} with the default
190     * initial capacity (11) that orders its elements according to
191     * their {@linkplain Comparable natural ordering}.
192     */
193    public PriorityBlockingQueue() {
194        this(DEFAULT_INITIAL_CAPACITY, null);
195    }
196
197    /**
198     * Creates a {@code PriorityBlockingQueue} with the specified
199     * initial capacity that orders its elements according to their
200     * {@linkplain Comparable natural ordering}.
201     *
202     * @param initialCapacity the initial capacity for this priority queue
203     * @throws IllegalArgumentException if {@code initialCapacity} is less
204     *         than 1
205     */
206    public PriorityBlockingQueue(int initialCapacity) {
207        this(initialCapacity, null);
208    }
209
210    /**
211     * Creates a {@code PriorityBlockingQueue} with the specified initial
212     * capacity that orders its elements according to the specified
213     * comparator.
214     *
215     * @param initialCapacity the initial capacity for this priority queue
216     * @param  comparator the comparator that will be used to order this
217     *         priority queue.  If {@code null}, the {@linkplain Comparable
218     *         natural ordering} of the elements will be used.
219     * @throws IllegalArgumentException if {@code initialCapacity} is less
220     *         than 1
221     */
222    public PriorityBlockingQueue(int initialCapacity,
223                                 Comparator<? super E> comparator) {
224        if (initialCapacity < 1)
225            throw new IllegalArgumentException();
226        this.lock = new ReentrantLock();
227        this.notEmpty = lock.newCondition();
228        this.comparator = comparator;
229        this.queue = new Object[initialCapacity];
230    }
231
232    /**
233     * Creates a {@code PriorityBlockingQueue} containing the elements
234     * in the specified collection.  If the specified collection is a
235     * {@link SortedSet} or a {@link PriorityQueue}, this
236     * priority queue will be ordered according to the same ordering.
237     * Otherwise, this priority queue will be ordered according to the
238     * {@linkplain Comparable natural ordering} of its elements.
239     *
240     * @param  c the collection whose elements are to be placed
241     *         into this priority queue
242     * @throws ClassCastException if elements of the specified collection
243     *         cannot be compared to one another according to the priority
244     *         queue's ordering
245     * @throws NullPointerException if the specified collection or any
246     *         of its elements are null
247     */
248    public PriorityBlockingQueue(Collection<? extends E> c) {
249        this.lock = new ReentrantLock();
250        this.notEmpty = lock.newCondition();
251        boolean heapify = true; // true if not known to be in heap order
252        boolean screen = true;  // true if must screen for nulls
253        if (c instanceof SortedSet<?>) {
254            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
255            this.comparator = (Comparator<? super E>) ss.comparator();
256            heapify = false;
257        }
258        else if (c instanceof PriorityBlockingQueue<?>) {
259            PriorityBlockingQueue<? extends E> pq =
260                (PriorityBlockingQueue<? extends E>) c;
261            this.comparator = (Comparator<? super E>) pq.comparator();
262            screen = false;
263            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
264                heapify = false;
265        }
266        Object[] a = c.toArray();
267        int n = a.length;
268        // If c.toArray incorrectly doesn't return Object[], copy it.
269        if (a.getClass() != Object[].class)
270            a = Arrays.copyOf(a, n, Object[].class);
271        if (screen && (n == 1 || this.comparator != null)) {
272            for (int i = 0; i < n; ++i)
273                if (a[i] == null)
274                    throw new NullPointerException();
275        }
276        this.queue = a;
277        this.size = n;
278        if (heapify)
279            heapify();
280    }
281
282    /**
283     * Tries to grow array to accommodate at least one more element
284     * (but normally expand by about 50%), giving up (allowing retry)
285     * on contention (which we expect to be rare). Call only while
286     * holding lock.
287     *
288     * @param array the heap array
289     * @param oldCap the length of the array
290     */
291    private void tryGrow(Object[] array, int oldCap) {
292        lock.unlock(); // must release and then re-acquire main lock
293        Object[] newArray = null;
294        if (allocationSpinLock == 0 &&
295            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
296            try {
297                int newCap = oldCap + ((oldCap < 64) ?
298                                       (oldCap + 2) : // grow faster if small
299                                       (oldCap >> 1));
300                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
301                    int minCap = oldCap + 1;
302                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
303                        throw new OutOfMemoryError();
304                    newCap = MAX_ARRAY_SIZE;
305                }
306                if (newCap > oldCap && queue == array)
307                    newArray = new Object[newCap];
308            } finally {
309                allocationSpinLock = 0;
310            }
311        }
312        if (newArray == null) // back off if another thread is allocating
313            Thread.yield();
314        lock.lock();
315        if (newArray != null && queue == array) {
316            queue = newArray;
317            System.arraycopy(array, 0, newArray, 0, oldCap);
318        }
319    }
320
321    /**
322     * Mechanics for poll().  Call only while holding lock.
323     */
324    private E dequeue() {
325        int n = size - 1;
326        if (n < 0)
327            return null;
328        else {
329            Object[] array = queue;
330            E result = (E) array[0];
331            E x = (E) array[n];
332            array[n] = null;
333            Comparator<? super E> cmp = comparator;
334            if (cmp == null)
335                siftDownComparable(0, x, array, n);
336            else
337                siftDownUsingComparator(0, x, array, n, cmp);
338            size = n;
339            return result;
340        }
341    }
342
343    /**
344     * Inserts item x at position k, maintaining heap invariant by
345     * promoting x up the tree until it is greater than or equal to
346     * its parent, or is the root.
347     *
348     * To simplify and speed up coercions and comparisons, the
349     * Comparable and Comparator versions are separated into different
350     * methods that are otherwise identical. (Similarly for siftDown.)
351     *
352     * @param k the position to fill
353     * @param x the item to insert
354     * @param array the heap array
355     */
356    private static <T> void siftUpComparable(int k, T x, Object[] array) {
357        Comparable<? super T> key = (Comparable<? super T>) x;
358        while (k > 0) {
359            int parent = (k - 1) >>> 1;
360            Object e = array[parent];
361            if (key.compareTo((T) e) >= 0)
362                break;
363            array[k] = e;
364            k = parent;
365        }
366        array[k] = key;
367    }
368
369    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
370                                       Comparator<? super T> cmp) {
371        while (k > 0) {
372            int parent = (k - 1) >>> 1;
373            Object e = array[parent];
374            if (cmp.compare(x, (T) e) >= 0)
375                break;
376            array[k] = e;
377            k = parent;
378        }
379        array[k] = x;
380    }
381
382    /**
383     * Inserts item x at position k, maintaining heap invariant by
384     * demoting x down the tree repeatedly until it is less than or
385     * equal to its children or is a leaf.
386     *
387     * @param k the position to fill
388     * @param x the item to insert
389     * @param array the heap array
390     * @param n heap size
391     */
392    private static <T> void siftDownComparable(int k, T x, Object[] array,
393                                               int n) {
394        if (n > 0) {
395            Comparable<? super T> key = (Comparable<? super T>)x;
396            int half = n >>> 1;           // loop while a non-leaf
397            while (k < half) {
398                int child = (k << 1) + 1; // assume left child is least
399                Object c = array[child];
400                int right = child + 1;
401                if (right < n &&
402                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
403                    c = array[child = right];
404                if (key.compareTo((T) c) <= 0)
405                    break;
406                array[k] = c;
407                k = child;
408            }
409            array[k] = key;
410        }
411    }
412
413    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
414                                                    int n,
415                                                    Comparator<? super T> cmp) {
416        if (n > 0) {
417            int half = n >>> 1;
418            while (k < half) {
419                int child = (k << 1) + 1;
420                Object c = array[child];
421                int right = child + 1;
422                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
423                    c = array[child = right];
424                if (cmp.compare(x, (T) c) <= 0)
425                    break;
426                array[k] = c;
427                k = child;
428            }
429            array[k] = x;
430        }
431    }
432
433    /**
434     * Establishes the heap invariant (described above) in the entire tree,
435     * assuming nothing about the order of the elements prior to the call.
436     * This classic algorithm due to Floyd (1964) is known to be O(size).
437     */
438    private void heapify() {
439        Object[] array = queue;
440        int n = size, i = (n >>> 1) - 1;
441        Comparator<? super E> cmp = comparator;
442        if (cmp == null) {
443            for (; i >= 0; i--)
444                siftDownComparable(i, (E) array[i], array, n);
445        }
446        else {
447            for (; i >= 0; i--)
448                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
449        }
450    }
451
452    /**
453     * Inserts the specified element into this priority queue.
454     *
455     * @param e the element to add
456     * @return {@code true} (as specified by {@link Collection#add})
457     * @throws ClassCastException if the specified element cannot be compared
458     *         with elements currently in the priority queue according to the
459     *         priority queue's ordering
460     * @throws NullPointerException if the specified element is null
461     */
462    public boolean add(E e) {
463        return offer(e);
464    }
465
466    /**
467     * Inserts the specified element into this priority queue.
468     * As the queue is unbounded, this method will never return {@code false}.
469     *
470     * @param e the element to add
471     * @return {@code true} (as specified by {@link Queue#offer})
472     * @throws ClassCastException if the specified element cannot be compared
473     *         with elements currently in the priority queue according to the
474     *         priority queue's ordering
475     * @throws NullPointerException if the specified element is null
476     */
477    public boolean offer(E e) {
478        if (e == null)
479            throw new NullPointerException();
480        final ReentrantLock lock = this.lock;
481        lock.lock();
482        int n, cap;
483        Object[] array;
484        while ((n = size) >= (cap = (array = queue).length))
485            tryGrow(array, cap);
486        try {
487            Comparator<? super E> cmp = comparator;
488            if (cmp == null)
489                siftUpComparable(n, e, array);
490            else
491                siftUpUsingComparator(n, e, array, cmp);
492            size = n + 1;
493            notEmpty.signal();
494        } finally {
495            lock.unlock();
496        }
497        return true;
498    }
499
500    /**
501     * Inserts the specified element into this priority queue.
502     * As the queue is unbounded, this method will never block.
503     *
504     * @param e the element to add
505     * @throws ClassCastException if the specified element cannot be compared
506     *         with elements currently in the priority queue according to the
507     *         priority queue's ordering
508     * @throws NullPointerException if the specified element is null
509     */
510    public void put(E e) {
511        offer(e); // never need to block
512    }
513
514    /**
515     * Inserts the specified element into this priority queue.
516     * As the queue is unbounded, this method will never block or
517     * return {@code false}.
518     *
519     * @param e the element to add
520     * @param timeout This parameter is ignored as the method never blocks
521     * @param unit This parameter is ignored as the method never blocks
522     * @return {@code true} (as specified by
523     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
524     * @throws ClassCastException if the specified element cannot be compared
525     *         with elements currently in the priority queue according to the
526     *         priority queue's ordering
527     * @throws NullPointerException if the specified element is null
528     */
529    public boolean offer(E e, long timeout, TimeUnit unit) {
530        return offer(e); // never need to block
531    }
532
533    public E poll() {
534        final ReentrantLock lock = this.lock;
535        lock.lock();
536        try {
537            return dequeue();
538        } finally {
539            lock.unlock();
540        }
541    }
542
543    public E take() throws InterruptedException {
544        final ReentrantLock lock = this.lock;
545        lock.lockInterruptibly();
546        E result;
547        try {
548            while ( (result = dequeue()) == null)
549                notEmpty.await();
550        } finally {
551            lock.unlock();
552        }
553        return result;
554    }
555
556    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
557        long nanos = unit.toNanos(timeout);
558        final ReentrantLock lock = this.lock;
559        lock.lockInterruptibly();
560        E result;
561        try {
562            while ( (result = dequeue()) == null && nanos > 0)
563                nanos = notEmpty.awaitNanos(nanos);
564        } finally {
565            lock.unlock();
566        }
567        return result;
568    }
569
570    public E peek() {
571        final ReentrantLock lock = this.lock;
572        lock.lock();
573        try {
574            return (size == 0) ? null : (E) queue[0];
575        } finally {
576            lock.unlock();
577        }
578    }
579
580    /**
581     * Returns the comparator used to order the elements in this queue,
582     * or {@code null} if this queue uses the {@linkplain Comparable
583     * natural ordering} of its elements.
584     *
585     * @return the comparator used to order the elements in this queue,
586     *         or {@code null} if this queue uses the natural
587     *         ordering of its elements
588     */
589    public Comparator<? super E> comparator() {
590        return comparator;
591    }
592
593    public int size() {
594        final ReentrantLock lock = this.lock;
595        lock.lock();
596        try {
597            return size;
598        } finally {
599            lock.unlock();
600        }
601    }
602
603    /**
604     * Always returns {@code Integer.MAX_VALUE} because
605     * a {@code PriorityBlockingQueue} is not capacity constrained.
606     * @return {@code Integer.MAX_VALUE} always
607     */
608    public int remainingCapacity() {
609        return Integer.MAX_VALUE;
610    }
611
612    private int indexOf(Object o) {
613        if (o != null) {
614            Object[] array = queue;
615            int n = size;
616            for (int i = 0; i < n; i++)
617                if (o.equals(array[i]))
618                    return i;
619        }
620        return -1;
621    }
622
623    /**
624     * Removes the ith element from queue.
625     */
626    private void removeAt(int i) {
627        Object[] array = queue;
628        int n = size - 1;
629        if (n == i) // removed last element
630            array[i] = null;
631        else {
632            E moved = (E) array[n];
633            array[n] = null;
634            Comparator<? super E> cmp = comparator;
635            if (cmp == null)
636                siftDownComparable(i, moved, array, n);
637            else
638                siftDownUsingComparator(i, moved, array, n, cmp);
639            if (array[i] == moved) {
640                if (cmp == null)
641                    siftUpComparable(i, moved, array);
642                else
643                    siftUpUsingComparator(i, moved, array, cmp);
644            }
645        }
646        size = n;
647    }
648
649    /**
650     * Removes a single instance of the specified element from this queue,
651     * if it is present.  More formally, removes an element {@code e} such
652     * that {@code o.equals(e)}, if this queue contains one or more such
653     * elements.  Returns {@code true} if and only if this queue contained
654     * the specified element (or equivalently, if this queue changed as a
655     * result of the call).
656     *
657     * @param o element to be removed from this queue, if present
658     * @return {@code true} if this queue changed as a result of the call
659     */
660    public boolean remove(Object o) {
661        final ReentrantLock lock = this.lock;
662        lock.lock();
663        try {
664            int i = indexOf(o);
665            if (i == -1)
666                return false;
667            removeAt(i);
668            return true;
669        } finally {
670            lock.unlock();
671        }
672    }
673
674    /**
675     * Identity-based version for use in Itr.remove.
676     */
677    void removeEQ(Object o) {
678        final ReentrantLock lock = this.lock;
679        lock.lock();
680        try {
681            Object[] array = queue;
682            for (int i = 0, n = size; i < n; i++) {
683                if (o == array[i]) {
684                    removeAt(i);
685                    break;
686                }
687            }
688        } finally {
689            lock.unlock();
690        }
691    }
692
693    /**
694     * Returns {@code true} if this queue contains the specified element.
695     * More formally, returns {@code true} if and only if this queue contains
696     * at least one element {@code e} such that {@code o.equals(e)}.
697     *
698     * @param o object to be checked for containment in this queue
699     * @return {@code true} if this queue contains the specified element
700     */
701    public boolean contains(Object o) {
702        final ReentrantLock lock = this.lock;
703        lock.lock();
704        try {
705            return indexOf(o) != -1;
706        } finally {
707            lock.unlock();
708        }
709    }
710
711    public String toString() {
712        return Helpers.collectionToString(this);
713    }
714
715    /**
716     * @throws UnsupportedOperationException {@inheritDoc}
717     * @throws ClassCastException            {@inheritDoc}
718     * @throws NullPointerException          {@inheritDoc}
719     * @throws IllegalArgumentException      {@inheritDoc}
720     */
721    public int drainTo(Collection<? super E> c) {
722        return drainTo(c, Integer.MAX_VALUE);
723    }
724
725    /**
726     * @throws UnsupportedOperationException {@inheritDoc}
727     * @throws ClassCastException            {@inheritDoc}
728     * @throws NullPointerException          {@inheritDoc}
729     * @throws IllegalArgumentException      {@inheritDoc}
730     */
731    public int drainTo(Collection<? super E> c, int maxElements) {
732        Objects.requireNonNull(c);
733        if (c == this)
734            throw new IllegalArgumentException();
735        if (maxElements <= 0)
736            return 0;
737        final ReentrantLock lock = this.lock;
738        lock.lock();
739        try {
740            int n = Math.min(size, maxElements);
741            for (int i = 0; i < n; i++) {
742                c.add((E) queue[0]); // In this order, in case add() throws.
743                dequeue();
744            }
745            return n;
746        } finally {
747            lock.unlock();
748        }
749    }
750
751    /**
752     * Atomically removes all of the elements from this queue.
753     * The queue will be empty after this call returns.
754     */
755    public void clear() {
756        final ReentrantLock lock = this.lock;
757        lock.lock();
758        try {
759            Object[] array = queue;
760            int n = size;
761            size = 0;
762            for (int i = 0; i < n; i++)
763                array[i] = null;
764        } finally {
765            lock.unlock();
766        }
767    }
768
769    /**
770     * Returns an array containing all of the elements in this queue.
771     * The returned array elements are in no particular order.
772     *
773     * <p>The returned array will be "safe" in that no references to it are
774     * maintained by this queue.  (In other words, this method must allocate
775     * a new array).  The caller is thus free to modify the returned array.
776     *
777     * <p>This method acts as bridge between array-based and collection-based
778     * APIs.
779     *
780     * @return an array containing all of the elements in this queue
781     */
782    public Object[] toArray() {
783        final ReentrantLock lock = this.lock;
784        lock.lock();
785        try {
786            return Arrays.copyOf(queue, size);
787        } finally {
788            lock.unlock();
789        }
790    }
791
792    /**
793     * Returns an array containing all of the elements in this queue; the
794     * runtime type of the returned array is that of the specified array.
795     * The returned array elements are in no particular order.
796     * If the queue fits in the specified array, it is returned therein.
797     * Otherwise, a new array is allocated with the runtime type of the
798     * specified array and the size of this queue.
799     *
800     * <p>If this queue fits in the specified array with room to spare
801     * (i.e., the array has more elements than this queue), the element in
802     * the array immediately following the end of the queue is set to
803     * {@code null}.
804     *
805     * <p>Like the {@link #toArray()} method, this method acts as bridge between
806     * array-based and collection-based APIs.  Further, this method allows
807     * precise control over the runtime type of the output array, and may,
808     * under certain circumstances, be used to save allocation costs.
809     *
810     * <p>Suppose {@code x} is a queue known to contain only strings.
811     * The following code can be used to dump the queue into a newly
812     * allocated array of {@code String}:
813     *
814     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
815     *
816     * Note that {@code toArray(new Object[0])} is identical in function to
817     * {@code toArray()}.
818     *
819     * @param a the array into which the elements of the queue are to
820     *          be stored, if it is big enough; otherwise, a new array of the
821     *          same runtime type is allocated for this purpose
822     * @return an array containing all of the elements in this queue
823     * @throws ArrayStoreException if the runtime type of the specified array
824     *         is not a supertype of the runtime type of every element in
825     *         this queue
826     * @throws NullPointerException if the specified array is null
827     */
828    public <T> T[] toArray(T[] a) {
829        final ReentrantLock lock = this.lock;
830        lock.lock();
831        try {
832            int n = size;
833            if (a.length < n)
834                // Make a new array of a's runtime type, but my contents:
835                return (T[]) Arrays.copyOf(queue, size, a.getClass());
836            System.arraycopy(queue, 0, a, 0, n);
837            if (a.length > n)
838                a[n] = null;
839            return a;
840        } finally {
841            lock.unlock();
842        }
843    }
844
845    /**
846     * Returns an iterator over the elements in this queue. The
847     * iterator does not return the elements in any particular order.
848     *
849     * <p>The returned iterator is
850     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
851     *
852     * @return an iterator over the elements in this queue
853     */
854    public Iterator<E> iterator() {
855        return new Itr(toArray());
856    }
857
858    /**
859     * Snapshot iterator that works off copy of underlying q array.
860     */
861    final class Itr implements Iterator<E> {
862        final Object[] array; // Array of all elements
863        int cursor;           // index of next element to return
864        int lastRet;          // index of last element, or -1 if no such
865
866        Itr(Object[] array) {
867            lastRet = -1;
868            this.array = array;
869        }
870
871        public boolean hasNext() {
872            return cursor < array.length;
873        }
874
875        public E next() {
876            if (cursor >= array.length)
877                throw new NoSuchElementException();
878            return (E)array[lastRet = cursor++];
879        }
880
881        public void remove() {
882            if (lastRet < 0)
883                throw new IllegalStateException();
884            removeEQ(array[lastRet]);
885            lastRet = -1;
886        }
887    }
888
889    /**
890     * Saves this queue to a stream (that is, serializes it).
891     *
892     * For compatibility with previous version of this class, elements
893     * are first copied to a java.util.PriorityQueue, which is then
894     * serialized.
895     *
896     * @param s the stream
897     * @throws java.io.IOException if an I/O error occurs
898     */
899    private void writeObject(java.io.ObjectOutputStream s)
900        throws java.io.IOException {
901        lock.lock();
902        try {
903            // avoid zero capacity argument
904            q = new PriorityQueue<E>(Math.max(size, 1), comparator);
905            q.addAll(this);
906            s.defaultWriteObject();
907        } finally {
908            q = null;
909            lock.unlock();
910        }
911    }
912
913    /**
914     * Reconstitutes this queue from a stream (that is, deserializes it).
915     * @param s the stream
916     * @throws ClassNotFoundException if the class of a serialized object
917     *         could not be found
918     * @throws java.io.IOException if an I/O error occurs
919     */
920    private void readObject(java.io.ObjectInputStream s)
921        throws java.io.IOException, ClassNotFoundException {
922        try {
923            s.defaultReadObject();
924            this.queue = new Object[q.size()];
925            comparator = q.comparator();
926            addAll(q);
927        } finally {
928            q = null;
929        }
930    }
931
932    /**
933     * Immutable snapshot spliterator that binds to elements "late".
934     */
935    final class PBQSpliterator implements Spliterator<E> {
936        Object[] array;        // null until late-bound-initialized
937        int index;
938        int fence;
939
940        PBQSpliterator() {}
941
942        PBQSpliterator(Object[] array, int index, int fence) {
943            this.array = array;
944            this.index = index;
945            this.fence = fence;
946        }
947
948        private int getFence() {
949            if (array == null)
950                fence = (array = toArray()).length;
951            return fence;
952        }
953
954        public PBQSpliterator trySplit() {
955            int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
956            return (lo >= mid) ? null :
957                new PBQSpliterator(array, lo, index = mid);
958        }
959
960        public void forEachRemaining(Consumer<? super E> action) {
961            Objects.requireNonNull(action);
962            final int hi = getFence(), lo = index;
963            final Object[] a = array;
964            index = hi;                 // ensure exhaustion
965            for (int i = lo; i < hi; i++)
966                action.accept((E) a[i]);
967        }
968
969        public boolean tryAdvance(Consumer<? super E> action) {
970            Objects.requireNonNull(action);
971            if (getFence() > index && index >= 0) {
972                action.accept((E) array[index++]);
973                return true;
974            }
975            return false;
976        }
977
978        public long estimateSize() { return getFence() - index; }
979
980        public int characteristics() {
981            return (Spliterator.NONNULL |
982                    Spliterator.SIZED |
983                    Spliterator.SUBSIZED);
984        }
985    }
986
987    /**
988     * Returns a {@link Spliterator} over the elements in this queue.
989     * The spliterator does not traverse elements in any particular order
990     * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
991     *
992     * <p>The returned spliterator is
993     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
994     *
995     * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
996     * {@link Spliterator#NONNULL}.
997     *
998     * @implNote
999     * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1000     *
1001     * @return a {@code Spliterator} over the elements in this queue
1002     * @since 1.8
1003     */
1004    public Spliterator<E> spliterator() {
1005        return new PBQSpliterator();
1006    }
1007
1008    // VarHandle mechanics
1009    private static final VarHandle ALLOCATIONSPINLOCK;
1010    static {
1011        try {
1012            MethodHandles.Lookup l = MethodHandles.lookup();
1013            ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1014                                                 "allocationSpinLock",
1015                                                 int.class);
1016        } catch (ReflectiveOperationException e) {
1017            throw new Error(e);
1018        }
1019    }
1020}
1021