Propchange: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ConcurrentSkipListMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ConcurrentSkipListSet.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ConcurrentSkipListSet.java?rev=800934&view=auto
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ConcurrentSkipListSet.java (added)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ConcurrentSkipListSet.java Tue Aug 4 19:39:24 2009
@@ -0,0 +1,456 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.*;
+import sun.misc.Unsafe;
+
+/**
+ * A scalable concurrent {@link NavigableSet} implementation based on
+ * a {@link ConcurrentSkipListMap}. The elements of the set are kept
+ * sorted according to their {@linkplain Comparable natural ordering},
+ * or by a {@link Comparator} provided at set creation time, depending
+ * on which constructor is used.
+ *
+ * <p>This implementation provides expected average <i>log(n)</i> time
+ * cost for the <tt>contains</tt>, <tt>add</tt>, and <tt>remove</tt>
+ * operations and their variants. Insertion, removal, and access
+ * operations safely execute concurrently by multiple threads.
+ * Iterators are <i>weakly consistent</i>, returning elements
+ * reflecting the state of the set at some point at or since the
+ * creation of the iterator. They do <em>not</em> throw {@link
+ * ConcurrentModificationException}, and may proceed concurrently with
+ * other operations. Ascending ordered views and their iterators are
+ * faster than descending ones.
+ *
+ * <p>Beware that, unlike in most collections, the <tt>size</tt>
+ * method is <em>not</em> a constant-time operation. Because of the
+ * asynchronous nature of these sets, determining the current number
+ * of elements requires a traversal of the elements. Additionally, the
+ * bulk operations <tt>addAll</tt>, <tt>removeAll</tt>,
+ * <tt>retainAll</tt>, and <tt>containsAll</tt> are <em>not</em>
+ * guaranteed to be performed atomically. For example, an iterator
+ * operating concurrently with an <tt>addAll</tt> operation might view
+ * only some of the added elements.
+ *
+ * <p>This class and its iterators implement all of the
+ * <em>optional</em> methods of the {@link Set} and {@link Iterator}
+ * interfaces. Like most other concurrent collection implementations,
+ * this class does not permit the use of <tt>null</tt> elements,
+ * because <tt>null</tt> arguments and return values cannot be reliably
+ * distinguished from the absence of elements.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @author Doug Lea
+ * @param <E> the type of elements maintained by this set
+ * @since 1.6
+ */
+public class ConcurrentSkipListSet<E>
+ extends AbstractSet<E>
+ implements NavigableSet<E>, Cloneable, java.io.Serializable {
+
+ private static final long serialVersionUID = -2479143111061671589L;
+
+ /**
+ * The underlying map. Uses Boolean.TRUE as value for each
+ * element. This field is declared final for the sake of thread
+ * safety, which entails some ugliness in clone()
+ */
+ private final ConcurrentNavigableMap<E,Object> m;
+
+ /**
+ * Constructs a new, empty set that orders its elements according to
+ * their {@linkplain Comparable natural ordering}.
+ */
+ public ConcurrentSkipListSet() {
+ m = new ConcurrentSkipListMap<E,Object>();
+ }
+
+ /**
+ * Constructs a new, empty set that orders its elements according to
+ * the specified comparator.
+ *
+ * @param comparator the comparator that will be used to order this set.
+ * If <tt>null</tt>, the {@linkplain Comparable natural
+ * ordering} of the elements will be used.
+ */
+ public ConcurrentSkipListSet(Comparator<? super E> comparator) {
+ m = new ConcurrentSkipListMap<E,Object>(comparator);
+ }
+
+ /**
+ * Constructs a new set containing the elements in the specified
+ * collection, that orders its elements according to their
+ * {@linkplain Comparable natural ordering}.
+ *
+ * @param c The elements that will comprise the new set
+ * @throws ClassCastException if the elements in <tt>c</tt> are
+ * not {@link Comparable}, or are not mutually comparable
+ * @throws NullPointerException if the specified collection or any
+ * of its elements are null
+ */
+ public ConcurrentSkipListSet(Collection<? extends E> c) {
+ m = new ConcurrentSkipListMap<E,Object>();
+ addAll(c);
+ }
+
+ /**
+ * Constructs a new set containing the same elements and using the
+ * same ordering as the specified sorted set.
+ *
+ * @param s sorted set whose elements will comprise the new set
+ * @throws NullPointerException if the specified sorted set or any
+ * of its elements are null
+ */
+ public ConcurrentSkipListSet(SortedSet<E> s) {
+ m = new ConcurrentSkipListMap<E,Object>(s.comparator());
+ addAll(s);
+ }
+
+ /**
+ * For use by submaps
+ */
+ ConcurrentSkipListSet(ConcurrentNavigableMap<E,Object> m) {
+ this.m = m;
+ }
+
+ /**
+ * Returns a shallow copy of this <tt>ConcurrentSkipListSet</tt>
+ * instance. (The elements themselves are not cloned.)
+ *
+ * @return a shallow copy of this set
+ */
+ public ConcurrentSkipListSet<E> clone() {
+ ConcurrentSkipListSet<E> clone = null;
+ try {
+ clone = (ConcurrentSkipListSet<E>) super.clone();
+ clone.setMap(new ConcurrentSkipListMap(m));
+ } catch (CloneNotSupportedException e) {
+ throw new InternalError();
+ }
+
+ return clone;
+ }
+
+ /* ---------------- Set operations -------------- */
+
+ /**
+ * Returns the number of elements in this set. If this set
+ * contains more than <tt>Integer.MAX_VALUE</tt> elements, it
+ * returns <tt>Integer.MAX_VALUE</tt>.
+ *
+ * <p>Beware that, unlike in most collections, this method is
+ * <em>NOT</em> a constant-time operation. Because of the
+ * asynchronous nature of these sets, determining the current
+ * number of elements requires traversing them all to count them.
+ * Additionally, it is possible for the size to change during
+ * execution of this method, in which case the returned result
+ * will be inaccurate. Thus, this method is typically not very
+ * useful in concurrent applications.
+ *
+ * @return the number of elements in this set
+ */
+ public int size() {
+ return m.size();
+ }
+
+ /**
+ * Returns <tt>true</tt> if this set contains no elements.
+ * @return <tt>true</tt> if this set contains no elements
+ */
+ public boolean isEmpty() {
+ return m.isEmpty();
+ }
+
+ /**
+ * Returns <tt>true</tt> if this set contains the specified element.
+ * More formally, returns <tt>true</tt> if and only if this set
+ * contains an element <tt>e</tt> such that <tt>o.equals(e)</tt>.
+ *
+ * @param o object to be checked for containment in this set
+ * @return <tt>true</tt> if this set contains the specified element
+ * @throws ClassCastException if the specified element cannot be
+ * compared with the elements currently in this set
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean contains(Object o) {
+ return m.containsKey(o);
+ }
+
+ /**
+ * Adds the specified element to this set if it is not already present.
+ * More formally, adds the specified element <tt>e</tt> to this set if
+ * the set contains no element <tt>e2</tt> such that <tt>e.equals(e2)</tt>.
+ * If this set already contains the element, the call leaves the set
+ * unchanged and returns <tt>false</tt>.
+ *
+ * @param e element to be added to this set
+ * @return <tt>true</tt> if this set did not already contain the
+ * specified element
+ * @throws ClassCastException if <tt>e</tt> cannot be compared
+ * with the elements currently in this set
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean add(E e) {
+ return m.putIfAbsent(e, Boolean.TRUE) == null;
+ }
+
+ /**
+ * Removes the specified element from this set if it is present.
+ * More formally, removes an element <tt>e</tt> such that
+ * <tt>o.equals(e)</tt>, if this set contains such an element.
+ * Returns <tt>true</tt> if this set contained the element (or
+ * equivalently, if this set changed as a result of the call).
+ * (This set will not contain the element once the call returns.)
+ *
+ * @param o object to be removed from this set, if present
+ * @return <tt>true</tt> if this set contained the specified element
+ * @throws ClassCastException if <tt>o</tt> cannot be compared
+ * with the elements currently in this set
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean remove(Object o) {
+ return m.remove(o, Boolean.TRUE);
+ }
+
+ /**
+ * Removes all of the elements from this set.
+ */
+ public void clear() {
+ m.clear();
+ }
+
+ /**
+ * Returns an iterator over the elements in this set in ascending order.
+ *
+ * @return an iterator over the elements in this set in ascending order
+ */
+ public Iterator<E> iterator() {
+ return m.navigableKeySet().iterator();
+ }
+
+ /**
+ * Returns an iterator over the elements in this set in descending order.
+ *
+ * @return an iterator over the elements in this set in descending order
+ */
+ public Iterator<E> descendingIterator() {
+ return m.descendingKeySet().iterator();
+ }
+
+
+ /* ---------------- AbstractSet Overrides -------------- */
+
+ /**
+ * Compares the specified object with this set for equality. Returns
+ * <tt>true</tt> if the specified object is also a set, the two sets
+ * have the same size, and every member of the specified set is
+ * contained in this set (or equivalently, every member of this set is
+ * contained in the specified set). This definition ensures that the
+ * equals method works properly across different implementations of the
+ * set interface.
+ *
+ * @param o the object to be compared for equality with this set
+ * @return <tt>true</tt> if the specified object is equal to this set
+ */
+ public boolean equals(Object o) {
+ // Override AbstractSet version to avoid calling size()
+ if (o == this)
+ return true;
+ if (!(o instanceof Set))
+ return false;
+ Collection<?> c = (Collection<?>) o;
+ try {
+ return containsAll(c) && c.containsAll(this);
+ } catch (ClassCastException unused) {
+ return false;
+ } catch (NullPointerException unused) {
+ return false;
+ }
+ }
+
+ /**
+ * Removes from this set all of its elements that are contained in
+ * the specified collection. If the specified collection is also
+ * a set, this operation effectively modifies this set so that its
+ * value is the <i>asymmetric set difference</i> of the two sets.
+ *
+ * @param c collection containing elements to be removed from this set
+ * @return <tt>true</tt> if this set changed as a result of the call
+ * @throws ClassCastException if the types of one or more elements in this
+ * set are incompatible with the specified collection
+ * @throws NullPointerException if the specified collection or any
+ * of its elements are null
+ */
+ public boolean removeAll(Collection<?> c) {
+ // Override AbstractSet version to avoid unnecessary call to size()
+ boolean modified = false;
+ for (Iterator<?> i = c.iterator(); i.hasNext(); )
+ if (remove(i.next()))
+ modified = true;
+ return modified;
+ }
+
+ /* ---------------- Relational operations -------------- */
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if the specified element is null
+ */
+ public E lower(E e) {
+ return m.lowerKey(e);
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if the specified element is null
+ */
+ public E floor(E e) {
+ return m.floorKey(e);
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if the specified element is null
+ */
+ public E ceiling(E e) {
+ return m.ceilingKey(e);
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if the specified element is null
+ */
+ public E higher(E e) {
+ return m.higherKey(e);
+ }
+
+ public E pollFirst() {
+ Map.Entry<E,Object> e = m.pollFirstEntry();
+ return e == null? null : e.getKey();
+ }
+
+ public E pollLast() {
+ Map.Entry<E,Object> e = m.pollLastEntry();
+ return e == null? null : e.getKey();
+ }
+
+
+ /* ---------------- SortedSet operations -------------- */
+
+
+ public Comparator<? super E> comparator() {
+ return m.comparator();
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E first() {
+ return m.firstKey();
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E last() {
+ return m.lastKey();
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if {@code fromElement} or
+ * {@code toElement} is null
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public NavigableSet<E> subSet(E fromElement,
+ boolean fromInclusive,
+ E toElement,
+ boolean toInclusive) {
+ return new ConcurrentSkipListSet<E>
+ (m.subMap(fromElement, fromInclusive,
+ toElement, toInclusive));
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if {@code toElement} is null
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public NavigableSet<E> headSet(E toElement, boolean inclusive) {
+ return new ConcurrentSkipListSet<E>(m.headMap(toElement, inclusive));
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if {@code fromElement} is null
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public NavigableSet<E> tailSet(E fromElement, boolean inclusive) {
+ return new ConcurrentSkipListSet<E>(m.tailMap(fromElement, inclusive));
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if {@code fromElement} or
+ * {@code toElement} is null
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public NavigableSet<E> subSet(E fromElement, E toElement) {
+ return subSet(fromElement, true, toElement, false);
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if {@code toElement} is null
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public NavigableSet<E> headSet(E toElement) {
+ return headSet(toElement, false);
+ }
+
+ /**
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException if {@code fromElement} is null
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public NavigableSet<E> tailSet(E fromElement) {
+ return tailSet(fromElement, true);
+ }
+
+ /**
+ * Returns a reverse order view of the elements contained in this set.
+ * The descending set is backed by this set, so changes to the set are
+ * reflected in the descending set, and vice-versa.
+ *
+ * <p>The returned set has an ordering equivalent to
+ * <tt>{@link Collections#reverseOrder(Comparator) Collections.reverseOrder}(comparator())</tt>.
+ * The expression {@code s.descendingSet().descendingSet()} returns a
+ * view of {@code s} essentially equivalent to {@code s}.
+ *
+ * @return a reverse order view of this set
+ */
+ public NavigableSet<E> descendingSet() {
+ return new ConcurrentSkipListSet(m.descendingMap());
+ }
+
+ // Support for resetting map in clone
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+ private static final long mapOffset;
+ static {
+ try {
+ mapOffset = unsafe.objectFieldOffset
+ (ConcurrentSkipListSet.class.getDeclaredField("m"));
+ } catch (Exception ex) { throw new Error(ex); }
+ }
+ private void setMap(ConcurrentNavigableMap<E,Object> map) {
+ unsafe.putObjectVolatile(this, mapOffset, map);
+ }
+
+}
Propchange: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ConcurrentSkipListSet.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingDeque.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingDeque.java?rev=800934&view=auto
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingDeque.java (added)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingDeque.java Tue Aug 4 19:39:24 2009
@@ -0,0 +1,1137 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
+ * linked nodes.
+ *
+ * <p> The optional capacity bound constructor argument serves as a
+ * way to prevent excessive expansion. The capacity, if unspecified,
+ * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
+ * dynamically created upon each insertion unless this would bring the
+ * deque above capacity.
+ *
+ * <p>Most operations run in constant time (ignoring time spent
+ * blocking). Exceptions include {@link #remove(Object) remove},
+ * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
+ * #removeLastOccurrence removeLastOccurrence}, {@link #contains
+ * contains}, {@link #iterator iterator.remove()}, and the bulk
+ * operations, all of which run in linear time.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.6
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class LinkedBlockingDeque<E>
+ extends AbstractQueue<E>
+ implements BlockingDeque<E>, java.io.Serializable {
+
+ /*
+ * Implemented as a simple doubly-linked list protected by a
+ * single lock and using conditions to manage blocking.
+ *
+ * To implement weakly consistent iterators, it appears we need to
+ * keep all Nodes GC-reachable from a predecessor dequeued Node.
+ * That would cause two problems:
+ * - allow a rogue Iterator to cause unbounded memory retention
+ * - cause cross-generational linking of old Nodes to new Nodes if
+ * a Node was tenured while live, which generational GCs have a
+ * hard time dealing with, causing repeated major collections.
+ * However, only non-deleted Nodes need to be reachable from
+ * dequeued Nodes, and reachability does not necessarily have to
+ * be of the kind understood by the GC. We use the trick of
+ * linking a Node that has just been dequeued to itself. Such a
+ * self-link implicitly means to jump to "first" (for next links)
+ * or "last" (for prev links).
+ */
+
+ /*
+ * We have "diamond" multiple interface/abstract class inheritance
+ * here, and that introduces ambiguities. Often we want the
+ * BlockingDeque javadoc combined with the AbstractQueue
+ * implementation, so a lot of method specs are duplicated here.
+ */
+
+ private static final long serialVersionUID = -387911632671998426L;
+
+ /** Doubly-linked list node class */
+ static final class Node<E> {
+ /**
+ * The item, or null if this node has been removed.
+ */
+ E item;
+
+ /**
+ * One of:
+ * - the real predecessor Node
+ * - this Node, meaning the predecessor is tail
+ * - null, meaning there is no predecessor
+ */
+ Node<E> prev;
+
+ /**
+ * One of:
+ * - the real successor Node
+ * - this Node, meaning the successor is head
+ * - null, meaning there is no successor
+ */
+ Node<E> next;
+
+ Node(E x, Node<E> p, Node<E> n) {
+ item = x;
+ prev = p;
+ next = n;
+ }
+ }
+
+ /**
+ * Pointer to first node.
+ * Invariant: (first == null && last == null) ||
+ * (first.prev == null && first.item != null)
+ */
+ transient Node<E> first;
+
+ /**
+ * Pointer to last node.
+ * Invariant: (first == null && last == null) ||
+ * (last.next == null && last.item != null)
+ */
+ transient Node<E> last;
+
+ /** Number of items in the deque */
+ private transient int count;
+
+ /** Maximum number of items in the deque */
+ private final int capacity;
+
+ /** Main lock guarding all access */
+ final ReentrantLock lock = new ReentrantLock();
+
+ /** Condition for waiting takes */
+ private final Condition notEmpty = lock.newCondition();
+
+ /** Condition for waiting puts */
+ private final Condition notFull = lock.newCondition();
+
+ /**
+ * Creates a {@code LinkedBlockingDeque} with a capacity of
+ * {@link Integer#MAX_VALUE}.
+ */
+ public LinkedBlockingDeque() {
+ this(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
+ *
+ * @param capacity the capacity of this deque
+ * @throws IllegalArgumentException if {@code capacity} is less than 1
+ */
+ public LinkedBlockingDeque(int capacity) {
+ if (capacity <= 0) throw new IllegalArgumentException();
+ this.capacity = capacity;
+ }
+
+ /**
+ * Creates a {@code LinkedBlockingDeque} with a capacity of
+ * {@link Integer#MAX_VALUE}, initially containing the elements of
+ * the given collection, added in traversal order of the
+ * collection's iterator.
+ *
+ * @param c the collection of elements to initially contain
+ * @throws NullPointerException if the specified collection or any
+ * of its elements are null
+ */
+ public LinkedBlockingDeque(Collection<? extends E> c) {
+ this(Integer.MAX_VALUE);
+ final ReentrantLock lock = this.lock;
+ lock.lock(); // Never contended, but necessary for visibility
+ try {
+ for (E e : c) {
+ if (e == null)
+ throw new NullPointerException();
+ if (!linkLast(e))
+ throw new IllegalStateException("Deque full");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ // Basic linking and unlinking operations, called only while holding lock
+
+ /**
+ * Links e as first element, or returns false if full.
+ */
+ private boolean linkFirst(E e) {
+ // assert lock.isHeldByCurrentThread();
+ if (count >= capacity)
+ return false;
+ Node<E> f = first;
+ Node<E> x = new Node<E>(e, null, f);
+ first = x;
+ if (last == null)
+ last = x;
+ else
+ f.prev = x;
+ ++count;
+ notEmpty.signal();
+ return true;
+ }
+
+ /**
+ * Links e as last element, or returns false if full.
+ */
+ private boolean linkLast(E e) {
+ // assert lock.isHeldByCurrentThread();
+ if (count >= capacity)
+ return false;
+ Node<E> l = last;
+ Node<E> x = new Node<E>(e, l, null);
+ last = x;
+ if (first == null)
+ first = x;
+ else
+ l.next = x;
+ ++count;
+ notEmpty.signal();
+ return true;
+ }
+
+ /**
+ * Removes and returns first element, or null if empty.
+ */
+ private E unlinkFirst() {
+ // assert lock.isHeldByCurrentThread();
+ Node<E> f = first;
+ if (f == null)
+ return null;
+ Node<E> n = f.next;
+ E item = f.item;
+ f.item = null;
+ f.next = f; // help GC
+ first = n;
+ if (n == null)
+ last = null;
+ else
+ n.prev = null;
+ --count;
+ notFull.signal();
+ return item;
+ }
+
+ /**
+ * Removes and returns last element, or null if empty.
+ */
+ private E unlinkLast() {
+ // assert lock.isHeldByCurrentThread();
+ Node<E> l = last;
+ if (l == null)
+ return null;
+ Node<E> p = l.prev;
+ E item = l.item;
+ l.item = null;
+ l.prev = l; // help GC
+ last = p;
+ if (p == null)
+ first = null;
+ else
+ p.next = null;
+ --count;
+ notFull.signal();
+ return item;
+ }
+
+ /**
+ * Unlinks x.
+ */
+ void unlink(Node<E> x) {
+ // assert lock.isHeldByCurrentThread();
+ Node<E> p = x.prev;
+ Node<E> n = x.next;
+ if (p == null) {
+ unlinkFirst();
+ } else if (n == null) {
+ unlinkLast();
+ } else {
+ p.next = n;
+ n.prev = p;
+ x.item = null;
+ // Don't mess with x's links. They may still be in use by
+ // an iterator.
+ --count;
+ notFull.signal();
+ }
+ }
+
+ // BlockingDeque methods
+
+ /**
+ * @throws IllegalStateException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void addFirst(E e) {
+ if (!offerFirst(e))
+ throw new IllegalStateException("Deque full");
+ }
+
+ /**
+ * @throws IllegalStateException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void addLast(E e) {
+ if (!offerLast(e))
+ throw new IllegalStateException("Deque full");
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public boolean offerFirst(E e) {
+ if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return linkFirst(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public boolean offerLast(E e) {
+ if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return linkLast(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public void putFirst(E e) throws InterruptedException {
+ if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ while (!linkFirst(e))
+ notFull.await();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public void putLast(E e) throws InterruptedException {
+ if (e == null) throw new NullPointerException();
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ while (!linkLast(e))
+ notFull.await();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public boolean offerFirst(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (e == null) throw new NullPointerException();
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ while (!linkFirst(e)) {
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public boolean offerLast(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (e == null) throw new NullPointerException();
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ while (!linkLast(e)) {
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E removeFirst() {
+ E x = pollFirst();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E removeLast() {
+ E x = pollLast();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E pollFirst() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return unlinkFirst();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollLast() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return unlinkLast();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E takeFirst() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ E x;
+ while ( (x = unlinkFirst()) == null)
+ notEmpty.await();
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E takeLast() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ E x;
+ while ( (x = unlinkLast()) == null)
+ notEmpty.await();
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollFirst(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ E x;
+ while ( (x = unlinkFirst()) == null) {
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollLast(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ E x;
+ while ( (x = unlinkLast()) == null) {
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E getFirst() {
+ E x = peekFirst();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E getLast() {
+ E x = peekLast();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E peekFirst() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return (first == null) ? null : first.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E peekLast() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return (last == null) ? null : last.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean removeFirstOccurrence(Object o) {
+ if (o == null) return false;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next) {
+ if (o.equals(p.item)) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean removeLastOccurrence(Object o) {
+ if (o == null) return false;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (Node<E> p = last; p != null; p = p.prev) {
+ if (o.equals(p.item)) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // BlockingQueue methods
+
+ /**
+ * Inserts the specified element at the end of this deque unless it would
+ * violate capacity restrictions. When using a capacity-restricted deque,
+ * it is generally preferable to use method {@link #offer(Object) offer}.
+ *
+ * <p>This method is equivalent to {@link #addLast}.
+ *
+ * @throws IllegalStateException if the element cannot be added at this
+ * time due to capacity restrictions
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean add(E e) {
+ addLast(e);
+ return true;
+ }
+
+ /**
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e) {
+ return offerLast(e);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public void put(E e) throws InterruptedException {
+ putLast(e);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return offerLast(e, timeout, unit);
+ }
+
+ /**
+ * Retrieves and removes the head of the queue represented by this deque.
+ * This method differs from {@link #poll poll} only in that it throws an
+ * exception if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ public E remove() {
+ return removeFirst();
+ }
+
+ public E poll() {
+ return pollFirst();
+ }
+
+ public E take() throws InterruptedException {
+ return takeFirst();
+ }
+
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return pollFirst(timeout, unit);
+ }
+
+ /**
+ * Retrieves, but does not remove, the head of the queue represented by
+ * this deque. This method differs from {@link #peek peek} only in that
+ * it throws an exception if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #getFirst() getFirst}.
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ public E element() {
+ return getFirst();
+ }
+
+ public E peek() {
+ return peekFirst();
+ }
+
+ /**
+ * Returns the number of additional elements that this deque can ideally
+ * (in the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this deque
+ * less the current {@code size} of this deque.
+ *
+ * <p>Note that you <em>cannot</em> always tell if an attempt to insert
+ * an element will succeed by inspecting {@code remainingCapacity}
+ * because it may be the case that another thread is about to
+ * insert or remove an element.
+ */
+ public int remainingCapacity() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return capacity - count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection<? super E> c) {
+ return drainTo(c, Integer.MAX_VALUE);
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ int n = Math.min(maxElements, count);
+ for (int i = 0; i < n; i++) {
+ c.add(first.item); // In this order, in case add() throws.
+ unlinkFirst();
+ }
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // Stack methods
+
+ /**
+ * @throws IllegalStateException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void push(E e) {
+ addFirst(e);
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E pop() {
+ return removeFirst();
+ }
+
+ // Collection methods
+
+ /**
+ * Removes the first occurrence of the specified element from this deque.
+ * If the deque does not contain the element, it is unchanged.
+ * More formally, removes the first element {@code e} such that
+ * {@code o.equals(e)} (if such an element exists).
+ * Returns {@code true} if this deque contained the specified element
+ * (or equivalently, if this deque changed as a result of the call).
+ *
+ * <p>This method is equivalent to
+ * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
+ *
+ * @param o element to be removed from this deque, if present
+ * @return {@code true} if this deque changed as a result of the call
+ */
+ public boolean remove(Object o) {
+ return removeFirstOccurrence(o);
+ }
+
+ /**
+ * Returns the number of elements in this deque.
+ *
+ * @return the number of elements in this deque
+ */
+ public int size() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns {@code true} if this deque contains the specified element.
+ * More formally, returns {@code true} if and only if this deque contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
+ *
+ * @param o object to be checked for containment in this deque
+ * @return {@code true} if this deque contains the specified element
+ */
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next)
+ if (o.equals(p.item))
+ return true;
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /*
+ * TODO: Add support for more efficient bulk operations.
+ *
+ * We don't want to acquire the lock for every iteration, but we
+ * also want other threads a chance to interact with the
+ * collection, especially when count is close to capacity.
+ */
+
+// /**
+// * Adds all of the elements in the specified collection to this
+// * queue. Attempts to addAll of a queue to itself result in
+// * {@code IllegalArgumentException}. Further, the behavior of
+// * this operation is undefined if the specified collection is
+// * modified while the operation is in progress.
+// *
+// * @param c collection containing elements to be added to this queue
+// * @return {@code true} if this queue changed as a result of the call
+// * @throws ClassCastException {@inheritDoc}
+// * @throws NullPointerException {@inheritDoc}
+// * @throws IllegalArgumentException {@inheritDoc}
+// * @throws IllegalStateException {@inheritDoc}
+// * @see #add(Object)
+// */
+// public boolean addAll(Collection<? extends E> c) {
+// if (c == null)
+// throw new NullPointerException();
+// if (c == this)
+// throw new IllegalArgumentException();
+// final ReentrantLock lock = this.lock;
+// lock.lock();
+// try {
+// boolean modified = false;
+// for (E e : c)
+// if (linkLast(e))
+// modified = true;
+// return modified;
+// } finally {
+// lock.unlock();
+// }
+// }
+
+ /**
+ * Returns an array containing all of the elements in this deque, in
+ * proper sequence (from first to last element).
+ *
+ * <p>The returned array will be "safe" in that no references to it are
+ * maintained by this deque. (In other words, this method must allocate
+ * a new array). The caller is thus free to modify the returned array.
+ *
+ * <p>This method acts as bridge between array-based and collection-based
+ * APIs.
+ *
+ * @return an array containing all of the elements in this deque
+ */
+ @SuppressWarnings("unchecked")
+ public Object[] toArray() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ Object[] a = new Object[count];
+ int k = 0;
+ for (Node<E> p = first; p != null; p = p.next)
+ a[k++] = p.item;
+ return a;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an array containing all of the elements in this deque, in
+ * proper sequence; the runtime type of the returned array is that of
+ * the specified array. If the deque fits in the specified array, it
+ * is returned therein. Otherwise, a new array is allocated with the
+ * runtime type of the specified array and the size of this deque.
+ *
+ * <p>If this deque fits in the specified array with room to spare
+ * (i.e., the array has more elements than this deque), the element in
+ * the array immediately following the end of the deque is set to
+ * {@code null}.
+ *
+ * <p>Like the {@link #toArray()} method, this method acts as bridge between
+ * array-based and collection-based APIs. Further, this method allows
+ * precise control over the runtime type of the output array, and may,
+ * under certain circumstances, be used to save allocation costs.
+ *
+ * <p>Suppose {@code x} is a deque known to contain only strings.
+ * The following code can be used to dump the deque into a newly
+ * allocated array of {@code String}:
+ *
+ * <pre>
+ * String[] y = x.toArray(new String[0]);</pre>
+ *
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
+ *
+ * @param a the array into which the elements of the deque are to
+ * be stored, if it is big enough; otherwise, a new array of the
+ * same runtime type is allocated for this purpose
+ * @return an array containing all of the elements in this deque
+ * @throws ArrayStoreException if the runtime type of the specified array
+ * is not a supertype of the runtime type of every element in
+ * this deque
+ * @throws NullPointerException if the specified array is null
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] a) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ if (a.length < count)
+ a = (T[])java.lang.reflect.Array.newInstance
+ (a.getClass().getComponentType(), count);
+
+ int k = 0;
+ for (Node<E> p = first; p != null; p = p.next)
+ a[k++] = (T)p.item;
+ if (a.length > k)
+ a[k] = null;
+ return a;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public String toString() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return super.toString();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this deque.
+ * The deque will be empty after this call returns.
+ */
+ public void clear() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (Node<E> f = first; f != null; ) {
+ f.item = null;
+ Node<E> n = f.next;
+ f.prev = null;
+ f.next = null;
+ f = n;
+ }
+ first = last = null;
+ count = 0;
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements in this deque in proper sequence.
+ * The elements will be returned in order from first (head) to last (tail).
+ * The returned {@code Iterator} is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed to)
+ * reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this deque in proper sequence
+ */
+ public Iterator<E> iterator() {
+ return new Itr();
+ }
+
+ /**
+ * Returns an iterator over the elements in this deque in reverse
+ * sequential order. The elements will be returned in order from
+ * last (tail) to first (head).
+ * The returned {@code Iterator} is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed to)
+ * reflect any modifications subsequent to construction.
+ */
+ public Iterator<E> descendingIterator() {
+ return new DescendingItr();
+ }
+
+ /**
+ * Base class for Iterators for LinkedBlockingDeque
+ */
+ private abstract class AbstractItr implements Iterator<E> {
+ /**
+ * The next node to return in next()
+ */
+ Node<E> next;
+
+ /**
+ * nextItem holds on to item fields because once we claim that
+ * an element exists in hasNext(), we must return item read
+ * under lock (in advance()) even if it was in the process of
+ * being removed when hasNext() was called.
+ */
+ E nextItem;
+
+ /**
+ * Node returned by most recent call to next. Needed by remove.
+ * Reset to null if this element is deleted by a call to remove.
+ */
+ private Node<E> lastRet;
+
+ abstract Node<E> firstNode();
+ abstract Node<E> nextNode(Node<E> n);
+
+ AbstractItr() {
+ // set to initial position
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ next = firstNode();
+ nextItem = (next == null) ? null : next.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Advances next.
+ */
+ void advance() {
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ // assert next != null;
+ Node<E> s = nextNode(next);
+ if (s == next) {
+ next = firstNode();
+ } else {
+ // Skip over removed nodes.
+ // May be necessary if multiple interior Nodes are removed.
+ while (s != null && s.item == null)
+ s = nextNode(s);
+ next = s;
+ }
+ nextItem = (next == null) ? null : next.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ public E next() {
+ if (next == null)
+ throw new NoSuchElementException();
+ lastRet = next;
+ E x = nextItem;
+ advance();
+ return x;
+ }
+
+ public void remove() {
+ Node<E> n = lastRet;
+ if (n == null)
+ throw new IllegalStateException();
+ lastRet = null;
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ if (n.item != null)
+ unlink(n);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /** Forward iterator */
+ private class Itr extends AbstractItr {
+ Node<E> firstNode() { return first; }
+ Node<E> nextNode(Node<E> n) { return n.next; }
+ }
+
+ /** Descending iterator */
+ private class DescendingItr extends AbstractItr {
+ Node<E> firstNode() { return last; }
+ Node<E> nextNode(Node<E> n) { return n.prev; }
+ }
+
+ /**
+ * Save the state of this deque to a stream (that is, serialize it).
+ *
+ * @serialData The capacity (int), followed by elements (each an
+ * {@code Object}) in the proper order, followed by a null
+ * @param s the stream
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ // Write out capacity and any hidden stuff
+ s.defaultWriteObject();
+ // Write out all elements in the proper order.
+ for (Node<E> p = first; p != null; p = p.next)
+ s.writeObject(p.item);
+ // Use trailing null as sentinel
+ s.writeObject(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Reconstitute this deque from a stream (that is,
+ * deserialize it).
+ * @param s the stream
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ count = 0;
+ first = null;
+ last = null;
+ // Read in all elements and place in queue
+ for (;;) {
+ @SuppressWarnings("unchecked")
+ E item = (E)s.readObject();
+ if (item == null)
+ break;
+ add(item);
+ }
+ }
+
+}
Propchange: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingDeque.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java?rev=800934&r1=800933&r2=800934&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java Tue Aug 4 19:39:24 2009
@@ -5,9 +5,14 @@
*/
package java.util.concurrent;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-import java.util.*;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
/**
* An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
@@ -57,15 +62,43 @@
* items have been entered since the signal. And symmetrically for
* takes signalling puts. Operations such as remove(Object) and
* iterators acquire both locks.
+ *
+ * Visibility between writers and readers is provided as follows:
+ *
+ * Whenever an element is enqueued, the putLock is acquired and
+ * count updated. A subsequent reader guarantees visibility to the
+ * enqueued Node by either acquiring the putLock (via fullyLock)
+ * or by acquiring the takeLock, and then reading n = count.get();
+ * this gives visibility to the first n items.
+ *
+ * To implement weakly consistent iterators, it appears we need to
+ * keep all Nodes GC-reachable from a predecessor dequeued Node.
+ * That would cause two problems:
+ * - allow a rogue Iterator to cause unbounded memory retention
+ * - cause cross-generational linking of old Nodes to new Nodes if
+ * a Node was tenured while live, which generational GCs have a
+ * hard time dealing with, causing repeated major collections.
+ * However, only non-deleted Nodes need to be reachable from
+ * dequeued Nodes, and reachability does not necessarily have to
+ * be of the kind understood by the GC. We use the trick of
+ * linking a Node that has just been dequeued to itself. Such a
+ * self-link implicitly means to advance to head.next.
*/
/**
* Linked list node class
*/
static class Node<E> {
- /** The item, volatile to ensure barrier separating write and read */
- volatile E item;
+ E item;
+
+ /**
+ * One of:
+ * - the real successor Node
+ * - this Node, meaning the successor is head.next
+ * - null, meaning there is no successor (this is the last node)
+ */
Node<E> next;
+
Node(E x) { item = x; }
}
@@ -75,10 +108,16 @@
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);
- /** Head of linked list */
+ /**
+ * Head of linked list.
+ * Invariant: head.item == null
+ */
private transient Node<E> head;
- /** Tail of linked list */
+ /**
+ * Tail of linked list.
+ * Invariant: last.next == null
+ */
private transient Node<E> last;
/** Lock held by take, poll, etc */
@@ -122,20 +161,26 @@
/**
* Creates a node and links it at end of queue.
+ *
* @param x the item
*/
- private void insert(E x) {
+ private void enqueue(E x) {
+ // assert putLock.isHeldByCurrentThread();
+ // assert last.next == null;
last = last.next = new Node<E>(x);
}
/**
- * Removes a node from head of queue,
+ * Removes a node from head of queue.
+ *
* @return the node
*/
- private E extract() {
+ private E dequeue() {
+ // assert takeLock.isHeldByCurrentThread();
+ // assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
- h.next = null; // help GC
+ h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
@@ -145,7 +190,7 @@
/**
* Lock to prevent both puts and takes.
*/
- private void fullyLock() {
+ void fullyLock() {
putLock.lock();
takeLock.lock();
}
@@ -153,14 +198,21 @@
/**
* Unlock to allow both puts and takes.
*/
- private void fullyUnlock() {
+ void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
+// /**
+// * Tells whether both locks are held by current thread.
+// */
+// boolean isFullyLocked() {
+// return (putLock.isHeldByCurrentThread() &&
+// takeLock.isHeldByCurrentThread());
+// }
/**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+ * Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
@@ -168,10 +220,10 @@
}
/**
- * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
+ * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
- * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
+ * @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
@@ -181,7 +233,7 @@
}
/**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+ * Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
@@ -192,8 +244,22 @@
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
- for (E e : c)
- add(e);
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock(); // Never contended, but necessary for visibility
+ try {
+ int n = 0;
+ for (E e : c) {
+ if (e == null)
+ throw new NullPointerException();
+ if (n == capacity)
+ throw new IllegalStateException("Queue full");
+ enqueue(e);
+ ++n;
+ }
+ count.set(n);
+ } finally {
+ putLock.unlock();
+ }
}
@@ -214,10 +280,10 @@
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
- * less the current <tt>size</tt> of this queue.
+ * less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
- * an element will succeed by inspecting <tt>remainingCapacity</tt>
+ * an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
@@ -234,8 +300,8 @@
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
- // Note: convention in all put/take/etc is to preset
- // local var holding count negative to indicate failure unless set.
+ // Note: convention in all put/take/etc is to preset local var
+ // holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
@@ -246,18 +312,13 @@
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
- * signalled if it ever changes from
- * capacity. Similarly for all other uses of count in
- * other wait guards.
+ * signalled if it ever changes from capacity. Similarly
+ * for all other uses of count in other wait guards.
*/
- try {
- while (count.get() == capacity)
+ while (count.get() == capacity) {
notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to a non-interrupted thread
- throw ie;
}
- insert(e);
+ enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
@@ -272,7 +333,7 @@
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
- * @return <tt>true</tt> if successful, or <tt>false</tt> if
+ * @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available.
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
@@ -287,23 +348,15 @@
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
- for (;;) {
- if (count.get() < capacity) {
- insert(e);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- break;
- }
+ while (count.get() == capacity) {
if (nanos <= 0)
return false;
- try {
nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
}
+ enqueue(e);
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
} finally {
putLock.unlock();
}
@@ -315,7 +368,7 @@
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
- * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
+ * returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
@@ -333,7 +386,7 @@
putLock.lock();
try {
if (count.get() < capacity) {
- insert(e);
+ enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
@@ -354,15 +407,10 @@
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
- try {
- while (count.get() == 0)
+ while (count.get() == 0) {
notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to a non-interrupted thread
- throw ie;
}
-
- x = extract();
+ x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
@@ -382,23 +430,15 @@
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
- for (;;) {
- if (count.get() > 0) {
- x = extract();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- break;
- }
+ while (count.get() == 0) {
if (nanos <= 0)
return null;
- try {
nanos = notEmpty.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
}
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
} finally {
takeLock.unlock();
}
@@ -417,7 +457,7 @@
takeLock.lock();
try {
if (count.get() > 0) {
- x = extract();
+ x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
@@ -448,43 +488,47 @@
}
/**
+ * Unlinks interior Node p with predecessor trail.
+ */
+ void unlink(Node<E> p, Node<E> trail) {
+ // assert isFullyLocked();
+ // p.next is not changed, to allow iterators that are
+ // traversing p to maintain their weak-consistency guarantee.
+ p.item = null;
+ trail.next = p.next;
+ if (last == p)
+ last = trail;
+ if (count.getAndDecrement() == capacity)
+ notFull.signal();
+ }
+
+ /**
* Removes a single instance of the specified element from this queue,
- * if it is present. More formally, removes an element <tt>e</tt> such
- * that <tt>o.equals(e)</tt>, if this queue contains one or more such
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
* elements.
- * Returns <tt>true</tt> if this queue contained the specified element
+ * Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
- * @return <tt>true</tt> if this queue changed as a result of the call
+ * @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
- boolean removed = false;
fullyLock();
try {
- Node<E> trail = head;
- Node<E> p = head.next;
- while (p != null) {
+ for (Node<E> trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
if (o.equals(p.item)) {
- removed = true;
- break;
+ unlink(p, trail);
+ return true;
}
- trail = p;
- p = p.next;
- }
- if (removed) {
- p.item = null;
- trail.next = p.next;
- if (last == p)
- last = trail;
- if (count.getAndDecrement() == capacity)
- notFull.signalAll();
}
+ return false;
} finally {
fullyUnlock();
}
- return removed;
}
/**
@@ -524,22 +568,22 @@
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
- * <tt>null</tt>.
+ * {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
- * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
+ * <p>Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
- * allocated array of <tt>String</tt>:
+ * allocated array of {@code String}:
*
* <pre>
* String[] y = x.toArray(new String[0]);</pre>
*
- * Note that <tt>toArray(new Object[0])</tt> is identical in function to
- * <tt>toArray()</tt>.
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
@@ -550,6 +594,7 @@
* this queue
* @throws NullPointerException if the specified array is null
*/
+ @SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
fullyLock();
try {
@@ -559,7 +604,7 @@
(a.getClass().getComponentType(), size);
int k = 0;
- for (Node p = head.next; p != null; p = p.next)
+ for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
@@ -585,11 +630,14 @@
public void clear() {
fullyLock();
try {
- head.next = null;
- assert head.item == null;
- last = head;
+ for (Node<E> p, h = head; (p = h.next) != null; h = p) {
+ h.next = h;
+ p.item = null;
+ }
+ head = last;
+ // assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
- notFull.signalAll();
+ notFull.signal();
} finally {
fullyUnlock();
}
@@ -602,30 +650,7 @@
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- Node<E> first;
- fullyLock();
- try {
- first = head.next;
- head.next = null;
- assert head.item == null;
- last = head;
- if (count.getAndSet(0) == capacity)
- notFull.signalAll();
- } finally {
- fullyUnlock();
- }
- // Transfer the elements outside of locks
- int n = 0;
- for (Node<E> p = first; p != null; p = p.next) {
- c.add(p.item);
- p.item = null;
- ++n;
- }
- return n;
+ return drainTo(c, Integer.MAX_VALUE);
}
/**
@@ -639,34 +664,44 @@
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
- fullyLock();
+ boolean signalNotFull = false;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
try {
- int n = 0;
- Node<E> p = head.next;
- while (p != null && n < maxElements) {
+ int n = Math.min(maxElements, count.get());
+ // count.get provides visibility to first n Nodes
+ Node<E> h = head;
+ int i = 0;
+ try {
+ while (i < n) {
+ Node<E> p = h.next;
c.add(p.item);
p.item = null;
- p = p.next;
- ++n;
- }
- if (n != 0) {
- head.next = p;
- assert head.item == null;
- if (p == null)
- last = head;
- if (count.getAndAdd(-n) == capacity)
- notFull.signalAll();
+ h.next = h;
+ h = p;
+ ++i;
}
return n;
} finally {
- fullyUnlock();
+ // Restore invariants even if c.add() threw
+ if (i > 0) {
+ // assert h.item == null;
+ head = h;
+ signalNotFull = (count.getAndAdd(-i) == capacity);
+ }
+ }
+ } finally {
+ takeLock.unlock();
+ if (signalNotFull)
+ signalNotFull();
}
}
/**
* Returns an iterator over the elements in this queue in proper sequence.
- * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
- * will never throw {@link ConcurrentModificationException},
+ * The returned {@code Iterator} is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
@@ -679,7 +714,7 @@
private class Itr implements Iterator<E> {
/*
- * Basic weak-consistent iterator. At all times hold the next
+ * Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
@@ -688,17 +723,13 @@
private E currentElement;
Itr() {
- final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
- final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
- putLock.lock();
- takeLock.lock();
+ fullyLock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
- takeLock.unlock();
- putLock.unlock();
+ fullyUnlock();
}
}
@@ -706,54 +737,54 @@
return current != null;
}
+ /**
+ * Unlike other traversal methods, iterators need to handle:
+ * - dequeued nodes (p.next == p)
+ * - interior removed nodes (p.item == null)
+ */
+ private Node<E> nextNode(Node<E> p) {
+ Node<E> s = p.next;
+ if (p == s)
+ return head.next;
+ // Skip over removed nodes.
+ // May be necessary if multiple interior Nodes are removed.
+ while (s != null && s.item == null)
+ s = s.next;
+ return s;
+ }
+
public E next() {
- final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
- final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
- putLock.lock();
- takeLock.lock();
+ fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
- current = current.next;
- if (current != null)
- currentElement = current.item;
+ current = nextNode(current);
+ currentElement = (current == null) ? null : current.item;
return x;
} finally {
- takeLock.unlock();
- putLock.unlock();
+ fullyUnlock();
}
}
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
- final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
- final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
- putLock.lock();
- takeLock.lock();
+ fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
- Node<E> trail = head;
- Node<E> p = head.next;
- while (p != null && p != node) {
- trail = p;
- p = p.next;
- }
+ for (Node<E> trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
if (p == node) {
- p.item = null;
- trail.next = p.next;
- if (last == p)
- last = trail;
- int c = count.getAndDecrement();
- if (c == capacity)
- notFull.signalAll();
+ unlink(p, trail);
+ break;
+ }
}
} finally {
- takeLock.unlock();
- putLock.unlock();
+ fullyUnlock();
}
}
}
@@ -762,7 +793,7 @@
* Save the state to a stream (that is, serialize it).
*
* @serialData The capacity is emitted (int), followed by all of
- * its elements (each an <tt>Object</tt>) in the proper order,
+ * its elements (each an {@code Object}) in the proper order,
* followed by a null
* @param s the stream
*/
@@ -788,6 +819,7 @@
/**
* Reconstitute this queue instance from a stream (that is,
* deserialize it).
+ *
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
@@ -800,6 +832,7 @@
// Read in all elements and place in queue
for (;;) {
+ @SuppressWarnings("unchecked")
E item = (E)s.readObject();
if (item == null)
break;
|