Return-Path: Delivered-To: apmail-incubator-harmony-commits-archive@www.apache.org Received: (qmail 2394 invoked from network); 12 Jul 2006 04:12:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 12 Jul 2006 04:12:51 -0000 Received: (qmail 48113 invoked by uid 500); 12 Jul 2006 04:12:50 -0000 Delivered-To: apmail-incubator-harmony-commits-archive@incubator.apache.org Received: (qmail 48083 invoked by uid 500); 12 Jul 2006 04:12:49 -0000 Mailing-List: contact harmony-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: harmony-dev@incubator.apache.org Delivered-To: mailing list harmony-commits@incubator.apache.org Received: (qmail 48071 invoked by uid 99); 12 Jul 2006 04:12:49 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jul 2006 21:12:49 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jul 2006 21:12:41 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 21B6D1A9825; Tue, 11 Jul 2006 21:12:21 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r421111 [5/11] - in /incubator/harmony/enhanced/classlib/trunk/sandbox: ./ juc-proposal/ juc-proposal/concurrent/ juc-proposal/concurrent/.settings/ juc-proposal/concurrent/META-INF/ juc-proposal/concurrent/src/ juc-proposal/concurrent/src/... Date: Wed, 12 Jul 2006 04:12:08 -0000 To: harmony-commits@incubator.apache.org From: ndbeyer@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060712041221.21B6D1A9825@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java?rev=421111&view=auto ============================================================================== --- incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java (added) +++ incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java Tue Jul 11 21:12:04 2006 @@ -0,0 +1,720 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; +import java.util.*; + +/** + * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on + * linked nodes. + * This queue orders elements FIFO (first-in-first-out). + * The head of the queue is that element that has been on the + * queue the longest time. + * The tail of the queue is that element that has been on the + * queue the shortest time. New elements + * are inserted at the tail of the queue, and the queue retrieval + * operations obtain elements at the head of the queue. + * Linked queues typically have higher throughput than array-based queues but + * less predictable performance in most concurrent applications. + * + *

The optional capacity bound constructor argument serves as a + * way to prevent excessive queue expansion. The capacity, if unspecified, + * is equal to {@link Integer#MAX_VALUE}. Linked nodes are + * dynamically created upon each insertion unless this would bring the + * queue above capacity. + * + *

This class implements all of the optional methods + * of the {@link Collection} and {@link Iterator} interfaces. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.5 + * @author Doug Lea + * @param the type of elements held in this collection + * + **/ +public class LinkedBlockingQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + private static final long serialVersionUID = -6903933977591709194L; + + /* + * A variant of the "two lock queue" algorithm. The putLock gates + * entry to put (and offer), and has an associated condition for + * waiting puts. Similarly for the takeLock. The "count" field + * that they both rely on is maintained as an atomic to avoid + * needing to get both locks in most cases. Also, to minimize need + * for puts to get takeLock and vice-versa, cascading notifies are + * used. When a put notices that it has enabled at least one take, + * it signals taker. That taker in turn signals others if more + * items have been entered since the signal. And symmetrically for + * takes signalling puts. Operations such as remove(Object) and + * iterators acquire both locks. + */ + + /** + * Linked list node class + */ + static class Node { + /** The item, volatile to ensure barrier separating write and read */ + volatile E item; + Node next; + Node(E x) { item = x; } + } + + /** The capacity bound, or Integer.MAX_VALUE if none */ + private final int capacity; + + /** Current number of elements */ + private final AtomicInteger count = new AtomicInteger(0); + + /** Head of linked list */ + private transient Node head; + + /** Tail of linked list */ + private transient Node last; + + /** Lock held by take, poll, etc */ + private final ReentrantLock takeLock = new ReentrantLock(); + + /** Wait queue for waiting takes */ + private final Condition notEmpty = takeLock.newCondition(); + + /** Lock held by put, offer, etc */ + private final ReentrantLock putLock = new ReentrantLock(); + + /** Wait queue for waiting puts */ + private final Condition notFull = putLock.newCondition(); + + /** + * Signal a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void signalNotEmpty() { + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * Signal a waiting put. Called only from take/poll. + */ + private void signalNotFull() { + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + notFull.signal(); + } finally { + putLock.unlock(); + } + } + + /** + * Create a node and link it at end of queue + * @param x the item + */ + private void insert(E x) { + last = last.next = new Node(x); + } + + /** + * Remove a node from head of queue, + * @return the node + */ + private E extract() { + Node first = head.next; + head = first; + E x = first.item; + first.item = null; + return x; + } + + /** + * Lock to prevent both puts and takes. + */ + private void fullyLock() { + putLock.lock(); + takeLock.lock(); + } + + /** + * Unlock to allow both puts and takes. + */ + private void fullyUnlock() { + takeLock.unlock(); + putLock.unlock(); + } + + + /** + * Creates a LinkedBlockingQueue with a capacity of + * {@link Integer#MAX_VALUE}. + */ + public LinkedBlockingQueue() { + this(Integer.MAX_VALUE); + } + + /** + * Creates a LinkedBlockingQueue with the given (fixed) capacity. + * + * @param capacity the capacity of this queue. + * @throws IllegalArgumentException if capacity is not greater + * than zero. + */ + public LinkedBlockingQueue(int capacity) { + if (capacity <= 0) throw new IllegalArgumentException(); + this.capacity = capacity; + last = head = new Node(null); + } + + /** + * Creates a LinkedBlockingQueue with a capacity of + * {@link Integer#MAX_VALUE}, initially containing the elements of the + * given collection, + * added in traversal order of the collection's iterator. + * @param c the collection of elements to initially contain + * @throws NullPointerException if c or any element within it + * is null + */ + public LinkedBlockingQueue(Collection c) { + this(Integer.MAX_VALUE); + for (Iterator it = c.iterator(); it.hasNext();) + add(it.next()); + } + + + // this doc comment is overridden to remove the reference to collections + // greater in size than Integer.MAX_VALUE + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue. + */ + public int size() { + return count.get(); + } + + // this doc comment is a modified copy of the inherited doc comment, + // without the reference to unlimited queues. + /** + * Returns the number of elements that this queue can ideally (in + * the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this queue + * less the current size of this queue. + *

Note that you cannot always tell if + * an attempt to add an element will succeed by + * inspecting remainingCapacity because it may be the + * case that a waiting consumer is ready to take an + * element out of an otherwise full queue. + */ + public int remainingCapacity() { + return capacity - count.get(); + } + + /** + * Adds the specified element to the tail of this queue, waiting if + * necessary for space to become available. + * @param o the element to add + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + public void put(E o) throws InterruptedException { + if (o == null) throw new NullPointerException(); + // Note: convention in all put/take/etc is to preset + // local var holding count negative to indicate failure unless set. + int c = -1; + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + /* + * Note that count is used in wait guard even though it is + * not protected by lock. This works because count can + * only decrease at this point (all other puts are shut + * out by lock), and we (or some other waiting put) are + * signalled if it ever changes from + * capacity. Similarly for all other uses of count in + * other wait guards. + */ + try { + while (count.get() == capacity) + notFull.await(); + } catch (InterruptedException ie) { + notFull.signal(); // propagate to a non-interrupted thread + throw ie; + } + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary up to the specified wait time for space to become available. + * @param o the element to add + * @param timeout how long to wait before giving up, in units of + * unit + * @param unit a TimeUnit determining how to interpret the + * timeout parameter + * @return true if successful, or false if + * the specified waiting time elapses before space is available. + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o, long timeout, TimeUnit unit) + throws InterruptedException { + + if (o == null) throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + int c = -1; + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + for (;;) { + if (count.get() < capacity) { + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + break; + } + if (nanos <= 0) + return false; + try { + nanos = notFull.awaitNanos(nanos); + } catch (InterruptedException ie) { + notFull.signal(); // propagate to a non-interrupted thread + throw ie; + } + } + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return true; + } + + /** + * Inserts the specified element at the tail of this queue if possible, + * returning immediately if this queue is full. + * + * @param o the element to add. + * @return true if it was possible to add the element to + * this queue, else false + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E o) { + if (o == null) throw new NullPointerException(); + final AtomicInteger count = this.count; + if (count.get() == capacity) + return false; + int c = -1; + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + if (count.get() < capacity) { + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return c >= 0; + } + + + public E take() throws InterruptedException { + E x; + int c = -1; + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + try { + while (count.get() == 0) + notEmpty.await(); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + E x = null; + int c = -1; + long nanos = unit.toNanos(timeout); + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + for (;;) { + if (count.get() > 0) { + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + break; + } + if (nanos <= 0) + return null; + try { + nanos = notEmpty.awaitNanos(nanos); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + } + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E poll() { + final AtomicInteger count = this.count; + if (count.get() == 0) + return null; + E x = null; + int c = -1; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + if (count.get() > 0) { + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + + public E peek() { + if (count.get() == 0) + return null; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + Node first = head.next; + if (first == null) + return null; + else + return first.item; + } finally { + takeLock.unlock(); + } + } + + public boolean remove(Object o) { + if (o == null) return false; + boolean removed = false; + fullyLock(); + try { + Node trail = head; + Node p = head.next; + while (p != null) { + if (o.equals(p.item)) { + removed = true; + break; + } + trail = p; + p = p.next; + } + if (removed) { + p.item = null; + trail.next = p.next; + if (count.getAndDecrement() == capacity) + notFull.signalAll(); + } + } finally { + fullyUnlock(); + } + return removed; + } + + public Object[] toArray() { + fullyLock(); + try { + int size = count.get(); + Object[] a = new Object[size]; + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = p.item; + return a; + } finally { + fullyUnlock(); + } + } + + public T[] toArray(T[] a) { + fullyLock(); + try { + int size = count.get(); + if (a.length < size) + a = (T[])java.lang.reflect.Array.newInstance + (a.getClass().getComponentType(), size); + + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = (T)p.item; + return a; + } finally { + fullyUnlock(); + } + } + + public String toString() { + fullyLock(); + try { + return super.toString(); + } finally { + fullyUnlock(); + } + } + + public void clear() { + fullyLock(); + try { + head.next = null; + if (count.getAndSet(0) == capacity) + notFull.signalAll(); + } finally { + fullyUnlock(); + } + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + Node first; + fullyLock(); + try { + first = head.next; + head.next = null; + if (count.getAndSet(0) == capacity) + notFull.signalAll(); + } finally { + fullyUnlock(); + } + // Transfer the elements outside of locks + int n = 0; + for (Node p = first; p != null; p = p.next) { + c.add(p.item); + p.item = null; + ++n; + } + return n; + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + fullyLock(); + try { + int n = 0; + Node p = head.next; + while (p != null && n < maxElements) { + c.add(p.item); + p.item = null; + p = p.next; + ++n; + } + if (n != 0) { + head.next = p; + if (count.getAndAdd(-n) == capacity) + notFull.signalAll(); + } + return n; + } finally { + fullyUnlock(); + } + } + + /** + * Returns an iterator over the elements in this queue in proper sequence. + * The returned Iterator is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this queue in proper sequence. + */ + public Iterator iterator() { + return new Itr(); + } + + private class Itr implements Iterator { + /* + * Basic weak-consistent iterator. At all times hold the next + * item to hand out so that if hasNext() reports true, we will + * still have it to return even if lost race with a take etc. + */ + private Node current; + private Node lastRet; + private E currentElement; + + Itr() { + final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + current = head.next; + if (current != null) + currentElement = current.item; + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + + public boolean hasNext() { + return current != null; + } + + public E next() { + final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + if (current == null) + throw new NoSuchElementException(); + E x = currentElement; + lastRet = current; + current = current.next; + if (current != null) + currentElement = current.item; + return x; + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + + public void remove() { + if (lastRet == null) + throw new IllegalStateException(); + final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + Node node = lastRet; + lastRet = null; + Node trail = head; + Node p = head.next; + while (p != null && p != node) { + trail = p; + p = p.next; + } + if (p == node) { + p.item = null; + trail.next = p.next; + int c = count.getAndDecrement(); + if (c == capacity) + notFull.signalAll(); + } + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + } + + /** + * Save the state to a stream (that is, serialize it). + * + * @serialData The capacity is emitted (int), followed by all of + * its elements (each an Object) in the proper order, + * followed by a null + * @param s the stream + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + + fullyLock(); + try { + // Write out any hidden stuff, plus capacity + s.defaultWriteObject(); + + // Write out all elements in the proper order. + for (Node p = head.next; p != null; p = p.next) + s.writeObject(p.item); + + // Use trailing null as sentinel + s.writeObject(null); + } finally { + fullyUnlock(); + } + } + + /** + * Reconstitute this queue instance from a stream (that is, + * deserialize it). + * @param s the stream + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + // Read in capacity, and any hidden stuff + s.defaultReadObject(); + + count.set(0); + last = head = new Node(null); + + // Read in all elements and place in queue + for (;;) { + E item = (E)s.readObject(); + if (item == null) + break; + add(item); + } + } +} Propchange: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java?rev=421111&view=auto ============================================================================== --- incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java (added) +++ incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java Tue Jul 11 21:12:04 2006 @@ -0,0 +1,455 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +import java.util.concurrent.locks.*; +import java.util.*; + +/** + * An unbounded {@linkplain BlockingQueue blocking queue} that uses + * the same ordering rules as class {@link PriorityQueue} and supplies + * blocking retrieval operations. While this queue is logically + * unbounded, attempted additions may fail due to resource exhaustion + * (causing OutOfMemoryError). This class does not permit + * null elements. A priority queue relying on natural + * ordering also does not permit insertion of non-comparable objects + * (doing so results in ClassCastException). + * + *

This class implements all of the optional methods + * of the {@link Collection} and {@link Iterator} interfaces. + *

The Iterator provided in method {@link #iterator()} is + * not guaranteed to traverse the elements of the + * PriorityBlockingQueue in any particular order. If you need ordered + * traversal, consider using Arrays.sort(pq.toArray()). + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.5 + * @author Doug Lea + * @param the type of elements held in this collection + */ +public class PriorityBlockingQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + private static final long serialVersionUID = 5595510919245408276L; + + private final PriorityQueue q; + private final ReentrantLock lock = new ReentrantLock(true); + private final Condition notEmpty = lock.newCondition(); + + /** + * Creates a PriorityBlockingQueue with the default initial + * capacity + * (11) that orders its elements according to their natural + * ordering (using Comparable). + */ + public PriorityBlockingQueue() { + q = new PriorityQueue(); + } + + /** + * Creates a PriorityBlockingQueue with the specified initial + * capacity + * that orders its elements according to their natural ordering + * (using Comparable). + * + * @param initialCapacity the initial capacity for this priority queue. + * @throws IllegalArgumentException if initialCapacity is less + * than 1 + */ + public PriorityBlockingQueue(int initialCapacity) { + q = new PriorityQueue(initialCapacity, null); + } + + /** + * Creates a PriorityBlockingQueue with the specified initial + * capacity + * that orders its elements according to the specified comparator. + * + * @param initialCapacity the initial capacity for this priority queue. + * @param comparator the comparator used to order this priority queue. + * If null then the order depends on the elements' natural + * ordering. + * @throws IllegalArgumentException if initialCapacity is less + * than 1 + */ + public PriorityBlockingQueue(int initialCapacity, + Comparator comparator) { + q = new PriorityQueue(initialCapacity, comparator); + } + + /** + * Creates a PriorityBlockingQueue containing the elements + * in the specified collection. The priority queue has an initial + * capacity of 110% of the size of the specified collection. If + * the specified collection is a {@link SortedSet} or a {@link + * PriorityQueue}, this priority queue will be sorted according to + * the same comparator, or according to its elements' natural + * order if the collection is sorted according to its elements' + * natural order. Otherwise, this priority queue is ordered + * according to its elements' natural order. + * + * @param c the collection whose elements are to be placed + * into this priority queue. + * @throws ClassCastException if elements of the specified collection + * cannot be compared to one another according to the priority + * queue's ordering. + * @throws NullPointerException if c or any element within it + * is null + */ + public PriorityBlockingQueue(Collection c) { + q = new PriorityQueue(c); + } + + + // these first few override just to update doc comments + + /** + * Adds the specified element to this queue. + * @param o the element to add + * @return true (as per the general contract of + * Collection.add). + * + * @throws NullPointerException if the specified element is null. + * @throws ClassCastException if the specified element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + */ + public boolean add(E o) { + return super.add(o); + } + + /** + * Returns the comparator used to order this collection, or null + * if this collection is sorted according to its elements natural ordering + * (using Comparable). + * + * @return the comparator used to order this collection, or null + * if this collection is sorted according to its elements natural ordering. + */ + public Comparator comparator() { + return q.comparator(); + } + + /** + * Inserts the specified element into this priority queue. + * + * @param o the element to add + * @return true + * @throws ClassCastException if the specified element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o) { + if (o == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + boolean ok = q.offer(o); + assert ok; + notEmpty.signal(); + return true; + } finally { + lock.unlock(); + } + } + + /** + * Adds the specified element to this priority queue. As the queue is + * unbounded this method will never block. + * @param o the element to add + * @throws ClassCastException if the element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + * @throws NullPointerException if the specified element is null. + */ + public void put(E o) { + offer(o); // never need to block + } + + /** + * Inserts the specified element into this priority queue. As the queue is + * unbounded this method will never block. + * @param o the element to add + * @param timeout This parameter is ignored as the method never blocks + * @param unit This parameter is ignored as the method never blocks + * @return true + * @throws ClassCastException if the element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o, long timeout, TimeUnit unit) { + return offer(o); // never need to block + } + + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + try { + while (q.size() == 0) + notEmpty.await(); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + E x = q.poll(); + assert x != null; + return x; + } finally { + lock.unlock(); + } + } + + + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.poll(); + } finally { + lock.unlock(); + } + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E x = q.poll(); + if (x != null) + return x; + if (nanos <= 0) + return null; + try { + nanos = notEmpty.awaitNanos(nanos); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + } + } finally { + lock.unlock(); + } + } + + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.peek(); + } finally { + lock.unlock(); + } + } + + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.size(); + } finally { + lock.unlock(); + } + } + + /** + * Always returns Integer.MAX_VALUE because + * a PriorityBlockingQueue is not capacity constrained. + * @return Integer.MAX_VALUE + */ + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + public boolean remove(Object o) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.remove(o); + } finally { + lock.unlock(); + } + } + + public boolean contains(Object o) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.contains(o); + } finally { + lock.unlock(); + } + } + + public Object[] toArray() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toArray(); + } finally { + lock.unlock(); + } + } + + + public String toString() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toString(); + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int n = 0; + E e; + while ( (e = q.poll()) != null) { + c.add(e); + ++n; + } + return n; + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int n = 0; + E e; + while (n < maxElements && (e = q.poll()) != null) { + c.add(e); + ++n; + } + return n; + } finally { + lock.unlock(); + } + } + + /** + * Atomically removes all of the elements from this delay queue. + * The queue will be empty after this call returns. + */ + public void clear() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + q.clear(); + } finally { + lock.unlock(); + } + } + + public T[] toArray(T[] a) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toArray(a); + } finally { + lock.unlock(); + } + } + + /** + * Returns an iterator over the elements in this queue. The + * iterator does not return the elements in any particular order. + * The returned iterator is a thread-safe "fast-fail" iterator + * that will throw {@link + * java.util.ConcurrentModificationException} upon detected + * interference. + * + * @return an iterator over the elements in this queue. + */ + public Iterator iterator() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return new Itr(q.iterator()); + } finally { + lock.unlock(); + } + } + + private class Itr implements Iterator { + private final Iterator iter; + Itr(Iterator i) { + iter = i; + } + + public boolean hasNext() { + /* + * No sync -- we rely on underlying hasNext to be + * stateless, in which case we can return true by mistake + * only when next() will subsequently throw + * ConcurrentModificationException. + */ + return iter.hasNext(); + } + + public E next() { + ReentrantLock lock = PriorityBlockingQueue.this.lock; + lock.lock(); + try { + return iter.next(); + } finally { + lock.unlock(); + } + } + + public void remove() { + ReentrantLock lock = PriorityBlockingQueue.this.lock; + lock.lock(); + try { + iter.remove(); + } finally { + lock.unlock(); + } + } + } + + /** + * Save the state to a stream (that is, serialize it). This + * merely wraps default serialization within lock. The + * serialization strategy for items is left to underlying + * Queue. Note that locking is not needed on deserialization, so + * readObject is not defined, just relying on default. + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + lock.lock(); + try { + s.defaultWriteObject(); + } finally { + lock.unlock(); + } + } + +} Propchange: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java?rev=421111&view=auto ============================================================================== --- incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java (added) +++ incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java Tue Jul 11 21:12:04 2006 @@ -0,0 +1,62 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * Exception thrown by an {@link Executor} when a task cannot be + * accepted for execution. + * + * @since 1.5 + * @author Doug Lea + */ +public class RejectedExecutionException extends RuntimeException { + private static final long serialVersionUID = -375805702767069545L; + + /** + * Constructs a RejectedExecutionException with no detail message. + * The cause is not initialized, and may subsequently be + * initialized by a call to {@link #initCause(Throwable) initCause}. + */ + public RejectedExecutionException() { } + + /** + * Constructs a RejectedExecutionException with the + * specified detail message. The cause is not initialized, and may + * subsequently be initialized by a call to {@link + * #initCause(Throwable) initCause}. + * + * @param message the detail message + */ + public RejectedExecutionException(String message) { + super(message); + } + + /** + * Constructs a RejectedExecutionException with the + * specified detail message and cause. + * + * @param message the detail message + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public RejectedExecutionException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a RejectedExecutionException with the + * specified cause. The detail message is set to:

 (cause ==
+     * null ? null : cause.toString())
(which typically contains + * the class and detail message of cause). + * + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public RejectedExecutionException(Throwable cause) { + super(cause); + } +} Propchange: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java?rev=421111&view=auto ============================================================================== --- incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java (added) +++ incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java Tue Jul 11 21:12:04 2006 @@ -0,0 +1,33 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * A handler for tasks that cannot be executed by a {@link + * ThreadPoolExecutor}. + * + * @since 1.5 + * @author Doug Lea + */ +public interface RejectedExecutionHandler { + + /** + * Method that may be invoked by a {@link ThreadPoolExecutor} when + * execute cannot accept a task. This may occur when no + * more threads or queue slots are available because their bounds + * would be exceeded, or upon shutdown of the Executor. + * + * In the absence other alternatives, the method may throw an + * unchecked {@link RejectedExecutionException}, which will be + * propagated to the caller of execute. + * + * @param r the runnable task requested to be executed + * @param executor the executor attempting to execute this task + * @throws RejectedExecutionException if there is no remedy + */ + void rejectedExecution(Runnable r, ThreadPoolExecutor executor); +} Propchange: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java?rev=421111&view=auto ============================================================================== --- incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java (added) +++ incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java Tue Jul 11 21:12:04 2006 @@ -0,0 +1,145 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.atomic.*; +import java.util.*; + +/** + * An {@link ExecutorService} that can schedule commands to run after a given + * delay, or to execute periodically. + * + *

The schedule methods create tasks with various delays + * and return a task object that can be used to cancel or check + * execution. The scheduleAtFixedRate and + * scheduleWithFixedDelay methods create and execute tasks + * that run periodically until cancelled. + * + *

Commands submitted using the {@link Executor#execute} and + * {@link ExecutorService} submit methods are scheduled with + * a requested delay of zero. Zero and negative delays (but not + * periods) are also allowed in schedule methods, and are + * treated as requests for immediate execution. + * + *

All schedule methods accept relative delays and + * periods as arguments, not absolute times or dates. It is a simple + * matter to transform an absolute time represented as a {@link + * java.util.Date} to the required form. For example, to schedule at + * a certain future date, you can use: schedule(task, + * date.getTime() - System.currentTimeMillis(), + * TimeUnit.MILLISECONDS). Beware however that expiration of a + * relative delay need not coincide with the current Date at + * which the task is enabled due to network time synchronization + * protocols, clock drift, or other factors. + * + * The {@link Executors} class provides convenient factory methods for + * the ScheduledExecutorService implementations provided in this package. + * + *

Usage Example

+ * + * Here is a class with a method that sets up a ScheduledExecutorService + * to beep every ten seconds for an hour: + * + *
+ * import static java.util.concurrent.TimeUnit;
+ * class BeeperControl {
+ *    private final ScheduledExecutorService scheduler = 
+ *       Executors.newScheduledThreadPool(1);
+ *
+ *    public void beepForAnHour() {
+ *        final Runnable beeper = new Runnable() {
+ *                public void run() { System.out.println("beep"); }
+ *            };
+ *        final ScheduledFuture<?> beeperHandle = 
+ *            scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
+ *        scheduler.schedule(new Runnable() {
+ *                public void run() { beeperHandle.cancel(true); }
+ *            }, 60 * 60, SECONDS);
+ *    }
+ * }
+ * 
+ * + * @since 1.5 + * @author Doug Lea + */ +public interface ScheduledExecutorService extends ExecutorService { + + /** + * Creates and executes a one-shot action that becomes enabled + * after the given delay. + * @param command the task to execute. + * @param delay the time from now to delay execution. + * @param unit the time unit of the delay parameter. + * @return a Future representing pending completion of the task, + * and whose get() method will return null + * upon completion. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if command is null + */ + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + + /** + * Creates and executes a ScheduledFuture that becomes enabled after the + * given delay. + * @param callable the function to execute. + * @param delay the time from now to delay execution. + * @param unit the time unit of the delay parameter. + * @return a ScheduledFuture that can be used to extract result or cancel. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if callable is null + */ + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the given + * period; that is executions will commence after + * initialDelay then initialDelay+period, then + * initialDelay + 2 * period, and so on. + * If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. + * @param command the task to execute. + * @param initialDelay the time to delay first execution. + * @param period the period between successive executions. + * @param unit the time unit of the initialDelay and period parameters + * @return a Future representing pending completion of the task, + * and whose get() method will throw an exception upon + * cancellation. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if command is null + * @throws IllegalArgumentException if period less than or equal to zero. + */ + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the + * given delay between the termination of one execution and the + * commencement of the next. If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. + * @param command the task to execute. + * @param initialDelay the time to delay first execution. + * @param delay the delay between the termination of one + * execution and the commencement of the next. + * @param unit the time unit of the initialDelay and delay parameters + * @return a Future representing pending completion of the task, + * and whose get() method will throw an exception upon + * cancellation. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if command is null + * @throws IllegalArgumentException if delay less than or equal to zero. + */ + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); + +} Propchange: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java?rev=421111&view=auto ============================================================================== --- incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java (added) +++ incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java Tue Jul 11 21:12:04 2006 @@ -0,0 +1,19 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * A delayed result-bearing action that can be cancelled. + * Usually a scheduled future is the result of scheduling + * a task with a {@link ScheduledExecutorService}. + * + * @since 1.5 + * @author Doug Lea + * @param The result type returned by this Future + */ +public interface ScheduledFuture extends Delayed, Future { +} Propchange: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java?rev=421111&view=auto ============================================================================== --- incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java (added) +++ incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java Tue Jul 11 21:12:04 2006 @@ -0,0 +1,541 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.atomic.*; +import java.util.*; + +/** + * A {@link ThreadPoolExecutor} that can additionally schedule + * commands to run after a given delay, or to execute + * periodically. This class is preferable to {@link java.util.Timer} + * when multiple worker threads are needed, or when the additional + * flexibility or capabilities of {@link ThreadPoolExecutor} (which + * this class extends) are required. + * + *

Delayed tasks execute no sooner than they are enabled, but + * without any real-time guarantees about when, after they are + * enabled, they will commence. Tasks scheduled for exactly the same + * execution time are enabled in first-in-first-out (FIFO) order of + * submission. + * + *

While this class inherits from {@link ThreadPoolExecutor}, a few + * of the inherited tuning methods are not useful for it. In + * particular, because it acts as a fixed-sized pool using + * corePoolSize threads and an unbounded queue, adjustments + * to maximumPoolSize have no useful effect. + * + * @since 1.5 + * @author Doug Lea + */ +public class ScheduledThreadPoolExecutor + extends ThreadPoolExecutor + implements ScheduledExecutorService { + + /** + * False if should cancel/suppress periodic tasks on shutdown. + */ + private volatile boolean continueExistingPeriodicTasksAfterShutdown; + + /** + * False if should cancel non-periodic tasks on shutdown. + */ + private volatile boolean executeExistingDelayedTasksAfterShutdown = true; + + /** + * Sequence number to break scheduling ties, and in turn to + * guarantee FIFO order among tied entries. + */ + private static final AtomicLong sequencer = new AtomicLong(0); + + /** Base of nanosecond timings, to avoid wrapping */ + private static final long NANO_ORIGIN = System.nanoTime(); + + /** + * Returns nanosecond time offset by origin + */ + final long now() { + return System.nanoTime() - NANO_ORIGIN; + } + + private class ScheduledFutureTask + extends FutureTask implements ScheduledFuture { + + /** Sequence number to break ties FIFO */ + private final long sequenceNumber; + /** The time the task is enabled to execute in nanoTime units */ + private long time; + /** + * Period in nanoseconds for repeating tasks. A positive + * value indicates fixed-rate execution. A negative value + * indicates fixed-delay execution. A value of 0 indicates a + * non-repeating task. + */ + private final long period; + + /** + * Creates a one-shot action with given nanoTime-based trigger time + */ + ScheduledFutureTask(Runnable r, V result, long ns) { + super(r, result); + this.time = ns; + this.period = 0; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + /** + * Creates a periodic action with given nano time and period + */ + ScheduledFutureTask(Runnable r, V result, long ns, long period) { + super(r, result); + this.time = ns; + this.period = period; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + /** + * Creates a one-shot action with given nanoTime-based trigger + */ + ScheduledFutureTask(Callable callable, long ns) { + super(callable); + this.time = ns; + this.period = 0; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + public long getDelay(TimeUnit unit) { + long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); + return d; + } + + public int compareTo(Object other) { + if (other == this) // compare zero ONLY if same object + return 0; + ScheduledFutureTask x = (ScheduledFutureTask)other; + long diff = time - x.time; + if (diff < 0) + return -1; + else if (diff > 0) + return 1; + else if (sequenceNumber < x.sequenceNumber) + return -1; + else + return 1; + } + + /** + * Returns true if this is a periodic (not a one-shot) action. + * @return true if periodic + */ + boolean isPeriodic() { + return period != 0; + } + + /** + * Run a periodic task + */ + private void runPeriodic() { + boolean ok = ScheduledFutureTask.super.runAndReset(); + boolean down = isShutdown(); + // Reschedule if not cancelled and not shutdown or policy allows + if (ok && (!down || + (getContinueExistingPeriodicTasksAfterShutdownPolicy() && + !isTerminating()))) { + long p = period; + if (p > 0) + time += p; + else + time = now() - p; + ScheduledThreadPoolExecutor.super.getQueue().add(this); + } + // This might have been the final executed delayed + // task. Wake up threads to check. + else if (down) + interruptIdleWorkers(); + } + + /** + * Overrides FutureTask version so as to reset/requeue if periodic. + */ + public void run() { + if (isPeriodic()) + runPeriodic(); + else + ScheduledFutureTask.super.run(); + } + } + + /** + * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. + */ + private void delayedExecute(Runnable command) { + if (isShutdown()) { + reject(command); + return; + } + // Prestart a thread if necessary. We cannot prestart it + // running the task because the task (probably) shouldn't be + // run yet, so thread will just idle until delay elapses. + if (getPoolSize() < getCorePoolSize()) + prestartCoreThread(); + + super.getQueue().add(command); + } + + /** + * Cancel and clear the queue of all tasks that should not be run + * due to shutdown policy. + */ + private void cancelUnwantedTasks() { + boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); + boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); + if (!keepDelayed && !keepPeriodic) + super.getQueue().clear(); + else if (keepDelayed || keepPeriodic) { + Object[] entries = super.getQueue().toArray(); + for (int i = 0; i < entries.length; ++i) { + Object e = entries[i]; + if (e instanceof ScheduledFutureTask) { + ScheduledFutureTask t = (ScheduledFutureTask)e; + if (t.isPeriodic()? !keepPeriodic : !keepDelayed) + t.cancel(false); + } + } + entries = null; + purge(); + } + } + + public boolean remove(Runnable task) { + if (!(task instanceof ScheduledFutureTask)) + return false; + return getQueue().remove(task); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given core + * pool size. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @throws IllegalArgumentException if corePoolSize less than or + * equal to zero + */ + public ScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue()); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @param threadFactory the factory to use when the executor + * creates a new thread. + * @throws NullPointerException if threadFactory is null + */ + public ScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue(), threadFactory); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached. + * @throws NullPointerException if handler is null + */ + public ScheduledThreadPoolExecutor(int corePoolSize, + RejectedExecutionHandler handler) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue(), handler); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @param threadFactory the factory to use when the executor + * creates a new thread. + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached. + * @throws NullPointerException if threadFactory or handler is null + */ + public ScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue(), threadFactory, handler); + } + + public ScheduledFuture schedule(Runnable command, + long delay, + TimeUnit unit) { + if (command == null || unit == null) + throw new NullPointerException(); + long triggerTime = now() + unit.toNanos(delay); + ScheduledFutureTask t = + new ScheduledFutureTask(command, null, triggerTime); + delayedExecute(t); + return t; + } + + public ScheduledFuture schedule(Callable callable, + long delay, + TimeUnit unit) { + if (callable == null || unit == null) + throw new NullPointerException(); + if (delay < 0) delay = 0; + long triggerTime = now() + unit.toNanos(delay); + ScheduledFutureTask t = + new ScheduledFutureTask(callable, triggerTime); + delayedExecute(t); + return t; + } + + public ScheduledFuture scheduleAtFixedRate(Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + if (command == null || unit == null) + throw new NullPointerException(); + if (period <= 0) + throw new IllegalArgumentException(); + if (initialDelay < 0) initialDelay = 0; + long triggerTime = now() + unit.toNanos(initialDelay); + ScheduledFutureTask t = + new ScheduledFutureTask(command, + null, + triggerTime, + unit.toNanos(period)); + delayedExecute(t); + return t; + } + + public ScheduledFuture scheduleWithFixedDelay(Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + if (command == null || unit == null) + throw new NullPointerException(); + if (delay <= 0) + throw new IllegalArgumentException(); + if (initialDelay < 0) initialDelay = 0; + long triggerTime = now() + unit.toNanos(initialDelay); + ScheduledFutureTask t = + new ScheduledFutureTask(command, + null, + triggerTime, + unit.toNanos(-delay)); + delayedExecute(t); + return t; + } + + + /** + * Execute command with zero required delay. This has effect + * equivalent to schedule(command, 0, anyUnit). Note + * that inspections of the queue and of the list returned by + * shutdownNow will access the zero-delayed + * {@link ScheduledFuture}, not the command itself. + * + * @param command the task to execute + * @throws RejectedExecutionException at discretion of + * RejectedExecutionHandler, if task cannot be accepted + * for execution because the executor has been shut down. + * @throws NullPointerException if command is null + */ + public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + schedule(command, 0, TimeUnit.NANOSECONDS); + } + + // Override AbstractExecutorService methods + + public Future submit(Runnable task) { + return schedule(task, 0, TimeUnit.NANOSECONDS); + } + + public Future submit(Runnable task, T result) { + return schedule(Executors.callable(task, result), + 0, TimeUnit.NANOSECONDS); + } + + public Future submit(Callable task) { + return schedule(task, 0, TimeUnit.NANOSECONDS); + } + + /** + * Set policy on whether to continue executing existing periodic + * tasks even when this executor has been shutdown. In + * this case, these tasks will only terminate upon + * shutdownNow, or after setting the policy to + * false when already shutdown. This value is by default + * false. + * @param value if true, continue after shutdown, else don't. + * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy + */ + public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { + continueExistingPeriodicTasksAfterShutdown = value; + if (!value && isShutdown()) + cancelUnwantedTasks(); + } + + /** + * Get the policy on whether to continue executing existing + * periodic tasks even when this executor has been + * shutdown. In this case, these tasks will only + * terminate upon shutdownNow or after setting the policy + * to false when already shutdown. This value is by + * default false. + * @return true if will continue after shutdown. + * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy + */ + public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { + return continueExistingPeriodicTasksAfterShutdown; + } + + /** + * Set policy on whether to execute existing delayed + * tasks even when this executor has been shutdown. In + * this case, these tasks will only terminate upon + * shutdownNow, or after setting the policy to + * false when already shutdown. This value is by default + * true. + * @param value if true, execute after shutdown, else don't. + * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy + */ + public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { + executeExistingDelayedTasksAfterShutdown = value; + if (!value && isShutdown()) + cancelUnwantedTasks(); + } + + /** + * Get policy on whether to execute existing delayed + * tasks even when this executor has been shutdown. In + * this case, these tasks will only terminate upon + * shutdownNow, or after setting the policy to + * false when already shutdown. This value is by default + * true. + * @return true if will execute after shutdown. + * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy + */ + public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { + return executeExistingDelayedTasksAfterShutdown; + } + + + /** + * Initiates an orderly shutdown in which previously submitted + * tasks are executed, but no new tasks will be accepted. If the + * ExecuteExistingDelayedTasksAfterShutdownPolicy has + * been set false, existing delayed tasks whose delays + * have not yet elapsed are cancelled. And unless the + * ContinueExistingPeriodicTasksAfterShutdownPolicy has + * been set true, future executions of existing periodic + * tasks will be cancelled. + */ + public void shutdown() { + cancelUnwantedTasks(); + super.shutdown(); + } + + /** + * Attempts to stop all actively executing tasks, halts the + * processing of waiting tasks, and returns a list of the tasks that were + * awaiting execution. + * + *

There are no guarantees beyond best-effort attempts to stop + * processing actively executing tasks. This implementation + * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or + * fail to respond to interrupts, they may never terminate. + * + * @return list of tasks that never commenced execution. Each + * element of this list is a {@link ScheduledFuture}, + * including those tasks submitted using execute, which + * are for scheduling purposes used as the basis of a zero-delay + * ScheduledFuture. + */ + public List shutdownNow() { + return super.shutdownNow(); + } + + /** + * Returns the task queue used by this executor. Each element of + * this queue is a {@link ScheduledFuture}, including those + * tasks submitted using execute which are for scheduling + * purposes used as the basis of a zero-delay + * ScheduledFuture. Iteration over this queue is + * not guaranteed to traverse tasks in the order in + * which they will execute. + * + * @return the task queue + */ + public BlockingQueue getQueue() { + return super.getQueue(); + } + + /** + * An annoying wrapper class to convince generics compiler to + * use a DelayQueue as a BlockingQueue + */ + private static class DelayedWorkQueue + extends AbstractCollection + implements BlockingQueue { + + private final DelayQueue dq = new DelayQueue(); + public Runnable poll() { return dq.poll(); } + public Runnable peek() { return dq.peek(); } + public Runnable take() throws InterruptedException { return dq.take(); } + public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { + return dq.poll(timeout, unit); + } + + public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); } + public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); } + public void put(Runnable x) { + dq.put((ScheduledFutureTask)x); + } + public boolean offer(Runnable x, long timeout, TimeUnit unit) { + return dq.offer((ScheduledFutureTask)x, timeout, unit); + } + + public Runnable remove() { return dq.remove(); } + public Runnable element() { return dq.element(); } + public void clear() { dq.clear(); } + public int drainTo(Collection c) { return dq.drainTo(c); } + public int drainTo(Collection c, int maxElements) { + return dq.drainTo(c, maxElements); + } + + public int remainingCapacity() { return dq.remainingCapacity(); } + public boolean remove(Object x) { return dq.remove(x); } + public boolean contains(Object x) { return dq.contains(x); } + public int size() { return dq.size(); } + public boolean isEmpty() { return dq.isEmpty(); } + public Object[] toArray() { return dq.toArray(); } + public T[] toArray(T[] array) { return dq.toArray(array); } + public Iterator iterator() { + return new Iterator() { + private Iterator it = dq.iterator(); + public boolean hasNext() { return it.hasNext(); } + public Runnable next() { return it.next(); } + public void remove() { it.remove(); } + }; + } + } +} Propchange: incubator/harmony/enhanced/classlib/trunk/sandbox/juc-proposal/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java ------------------------------------------------------------------------------ svn:eol-style = native