Return-Path: Delivered-To: apmail-incubator-harmony-commits-archive@www.apache.org Received: (qmail 99848 invoked from network); 11 Sep 2006 00:00:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 11 Sep 2006 00:00:14 -0000 Received: (qmail 75538 invoked by uid 500); 11 Sep 2006 00:00:13 -0000 Delivered-To: apmail-incubator-harmony-commits-archive@incubator.apache.org Received: (qmail 75418 invoked by uid 500); 11 Sep 2006 00:00:12 -0000 Mailing-List: contact harmony-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: harmony-dev@incubator.apache.org Delivered-To: mailing list harmony-commits@incubator.apache.org Received: (qmail 75030 invoked by uid 99); 11 Sep 2006 00:00:10 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Sep 2006 17:00:09 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Sep 2006 17:00:03 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 8DCBB1A9820; Sun, 10 Sep 2006 16:59:43 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r442038 [3/19] - in /incubator/harmony/standard/classlib/trunk/modules/concurrent: ./ src/ src/main/ src/main/java/ src/main/java/java/ src/main/java/java/util/ src/main/java/java/util/concurrent/ src/main/java/java/util/concurrent/atomic/ ... Date: Sun, 10 Sep 2006 23:59:35 -0000 To: harmony-commits@incubator.apache.org From: ndbeyer@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060910235943.8DCBB1A9820@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,280 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.*; + +/** + * A synchronization aid that allows one or more threads to wait until + * a set of operations being performed in other threads completes. + * + *

A CountDownLatch is initialized with a given + * count. The {@link #await await} methods block until the current + * {@link #getCount count} reaches zero due to invocations of the + * {@link #countDown} method, after which all waiting threads are + * released and any subsequent invocations of {@link #await await} return + * immediately. This is a one-shot phenomenon -- the count cannot be + * reset. If you need a version that resets the count, consider using + * a {@link CyclicBarrier}. + * + *

A CountDownLatch is a versatile synchronization tool + * and can be used for a number of purposes. A + * CountDownLatch initialized with a count of one serves as a + * simple on/off latch, or gate: all threads invoking {@link #await await} + * wait at the gate until it is opened by a thread invoking {@link + * #countDown}. A CountDownLatch initialized to N + * can be used to make one thread wait until N threads have + * completed some action, or some action has been completed N times. + *

A useful property of a CountDownLatch is that it + * doesn't require that threads calling countDown wait for + * the count to reach zero before proceeding, it simply prevents any + * thread from proceeding past an {@link #await await} until all + * threads could pass. + * + *

Sample usage: Here is a pair of classes in which a group + * of worker threads use two countdown latches: + *

    + *
  • The first is a start signal that prevents any worker from proceeding + * until the driver is ready for them to proceed; + *
  • The second is a completion signal that allows the driver to wait + * until all workers have completed. + *
+ * + *
+ * class Driver { // ...
+ *   void main() throws InterruptedException {
+ *     CountDownLatch startSignal = new CountDownLatch(1);
+ *     CountDownLatch doneSignal = new CountDownLatch(N);
+ *
+ *     for (int i = 0; i < N; ++i) // create and start threads
+ *       new Thread(new Worker(startSignal, doneSignal)).start();
+ *
+ *     doSomethingElse();            // don't let run yet
+ *     startSignal.countDown();      // let all threads proceed
+ *     doSomethingElse();
+ *     doneSignal.await();           // wait for all to finish
+ *   }
+ * }
+ *
+ * class Worker implements Runnable {
+ *   private final CountDownLatch startSignal;
+ *   private final CountDownLatch doneSignal;
+ *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
+ *      this.startSignal = startSignal;
+ *      this.doneSignal = doneSignal;
+ *   }
+ *   public void run() {
+ *      try {
+ *        startSignal.await();
+ *        doWork();
+ *        doneSignal.countDown();
+ *      } catch (InterruptedException ex) {} // return;
+ *   }
+ *
+ *   void doWork() { ... }
+ * }
+ *
+ * 
+ * + *

Another typical usage would be to divide a problem into N parts, + * describe each part with a Runnable that executes that portion and + * counts down on the latch, and queue all the Runnables to an + * Executor. When all sub-parts are complete, the coordinating thread + * will be able to pass through await. (When threads must repeatedly + * count down in this way, instead use a {@link CyclicBarrier}.) + * + *

+ * class Driver2 { // ...
+ *   void main() throws InterruptedException {
+ *     CountDownLatch doneSignal = new CountDownLatch(N);
+ *     Executor e = ...
+ *
+ *     for (int i = 0; i < N; ++i) // create and start threads
+ *       e.execute(new WorkerRunnable(doneSignal, i));
+ *
+ *     doneSignal.await();           // wait for all to finish
+ *   }
+ * }
+ *
+ * class WorkerRunnable implements Runnable {
+ *   private final CountDownLatch doneSignal;
+ *   private final int i;
+ *   WorkerRunnable(CountDownLatch doneSignal, int i) {
+ *      this.doneSignal = doneSignal;
+ *      this.i = i;
+ *   }
+ *   public void run() {
+ *      try {
+ *        doWork(i);
+ *        doneSignal.countDown();
+ *      } catch (InterruptedException ex) {} // return;
+ *   }
+ *
+ *   void doWork() { ... }
+ * }
+ *
+ * 
+ * + * @since 1.5 + * @author Doug Lea + */ +public class CountDownLatch { + /** + * Synchronization control For CountDownLatch. + * Uses AQS state to represent count. + */ + private static final class Sync extends AbstractQueuedSynchronizer { + Sync(int count) { + setState(count); + } + + int getCount() { + return getState(); + } + + public int tryAcquireShared(int acquires) { + return getState() == 0? 1 : -1; + } + + public boolean tryReleaseShared(int releases) { + // Decrement count; signal when transition to zero + for (;;) { + int c = getState(); + if (c == 0) + return false; + int nextc = c-1; + if (compareAndSetState(c, nextc)) + return nextc == 0; + } + } + } + + private final Sync sync; + /** + * Constructs a CountDownLatch initialized with the given + * count. + * + * @param count the number of times {@link #countDown} must be invoked + * before threads can pass through {@link #await}. + * + * @throws IllegalArgumentException if count is less than zero. + */ + public CountDownLatch(int count) { + if (count < 0) throw new IllegalArgumentException("count < 0"); + this.sync = new Sync(count); + } + + /** + * Causes the current thread to wait until the latch has counted down to + * zero, unless the thread is {@link Thread#interrupt interrupted}. + * + *

If the current {@link #getCount count} is zero then this method + * returns immediately. + *

If the current {@link #getCount count} is greater than zero then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of two things happen: + *

    + *
  • The count reaches zero due to invocations of the + * {@link #countDown} method; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread. + *
+ *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * @throws InterruptedException if the current thread is interrupted + * while waiting. + */ + public void await() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Causes the current thread to wait until the latch has counted down to + * zero, unless the thread is {@link Thread#interrupt interrupted}, + * or the specified waiting time elapses. + * + *

If the current {@link #getCount count} is zero then this method + * returns immediately with the value true. + * + *

If the current {@link #getCount count} is greater than zero then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of three things happen: + *

    + *
  • The count reaches zero due to invocations of the + * {@link #countDown} method; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread; or + *
  • The specified waiting time elapses. + *
+ *

If the count reaches zero then the method returns with the + * value true. + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the specified waiting time elapses then the value false + * is returned. + * If the time is + * less than or equal to zero, the method will not wait at all. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument. + * @return true if the count reached zero and false + * if the waiting time elapsed before the count reached zero. + * + * @throws InterruptedException if the current thread is interrupted + * while waiting. + */ + public boolean await(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + /** + * Decrements the count of the latch, releasing all waiting threads if + * the count reaches zero. + *

If the current {@link #getCount count} is greater than zero then + * it is decremented. If the new count is zero then all waiting threads + * are re-enabled for thread scheduling purposes. + *

If the current {@link #getCount count} equals zero then nothing + * happens. + */ + public void countDown() { + sync.releaseShared(1); + } + + /** + * Returns the current count. + *

This method is typically used for debugging and testing purposes. + * @return the current count. + */ + public long getCount() { + return sync.getCount(); + } + + /** + * Returns a string identifying this latch, as well as its state. + * The state, in brackets, includes the String + * "Count =" followed by the current count. + * @return a string identifying this latch, as well as its + * state + */ + public String toString() { + return super.toString() + "[Count = " + sync.getCount() + "]"; + } + +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CountDownLatch.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,430 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.locks.*; + +/** + * A synchronization aid that allows a set of threads to all wait for + * each other to reach a common barrier point. CyclicBarriers are + * useful in programs involving a fixed sized party of threads that + * must occasionally wait for each other. The barrier is called + * cyclic because it can be re-used after the waiting threads + * are released. + * + *

A CyclicBarrier supports an optional {@link Runnable} command + * that is run once per barrier point, after the last thread in the party + * arrives, but before any threads are released. + * This barrier action is useful + * for updating shared-state before any of the parties continue. + * + *

Sample usage: Here is an example of + * using a barrier in a parallel decomposition design: + *

+ * class Solver {
+ *   final int N;
+ *   final float[][] data;
+ *   final CyclicBarrier barrier;
+ *   
+ *   class Worker implements Runnable {
+ *     int myRow;
+ *     Worker(int row) { myRow = row; }
+ *     public void run() {
+ *       while (!done()) {
+ *         processRow(myRow);
+ *
+ *         try {
+ *           barrier.await(); 
+ *         } catch (InterruptedException ex) { 
+ *           return; 
+ *         } catch (BrokenBarrierException ex) { 
+ *           return; 
+ *         }
+ *       }
+ *     }
+ *   }
+ *
+ *   public Solver(float[][] matrix) {
+ *     data = matrix;
+ *     N = matrix.length;
+ *     barrier = new CyclicBarrier(N, 
+ *                                 new Runnable() {
+ *                                   public void run() { 
+ *                                     mergeRows(...); 
+ *                                   }
+ *                                 });
+ *     for (int i = 0; i < N; ++i) 
+ *       new Thread(new Worker(i)).start();
+ *
+ *     waitUntilDone();
+ *   }
+ * }
+ * 
+ * Here, each worker thread processes a row of the matrix then waits at the + * barrier until all rows have been processed. When all rows are processed + * the supplied {@link Runnable} barrier action is executed and merges the + * rows. If the merger + * determines that a solution has been found then done() will return + * true and each worker will terminate. + * + *

If the barrier action does not rely on the parties being suspended when + * it is executed, then any of the threads in the party could execute that + * action when it is released. To facilitate this, each invocation of + * {@link #await} returns the arrival index of that thread at the barrier. + * You can then choose which thread should execute the barrier action, for + * example: + *

  if (barrier.await() == 0) {
+ *     // log the completion of this iteration
+ *   }
+ * + *

The CyclicBarrier uses a fast-fail all-or-none breakage + * model for failed synchronization attempts: If a thread leaves a + * barrier point prematurely because of interruption, failure, or + * timeout, all other threads, even those that have not yet resumed + * from a previous {@link #await}. will also leave abnormally via + * {@link BrokenBarrierException} (or InterruptedException if + * they too were interrupted at about the same time). + * + * @since 1.5 + * @see CountDownLatch + * + * @author Doug Lea + */ +public class CyclicBarrier { + /** The lock for guarding barrier entry */ + private final ReentrantLock lock = new ReentrantLock(); + /** Condition to wait on until tripped */ + private final Condition trip = lock.newCondition(); + /** The number of parties */ + private final int parties; + /* The command to run when tripped */ + private final Runnable barrierCommand; + + /** + * The generation number. Incremented upon barrier trip. + * Retracted upon reset. + */ + private long generation; + + /** + * Breakage indicator. + */ + private boolean broken; + + /** + * Number of parties still waiting. Counts down from parties to 0 + * on each cycle. + */ + private int count; + + /** + * Updates state on barrier trip and wake up everyone. + */ + private void nextGeneration() { + count = parties; + ++generation; + trip.signalAll(); + } + + /** + * Sets barrier as broken and wake up everyone + */ + private void breakBarrier() { + broken = true; + trip.signalAll(); + } + + /** + * Main barrier code, covering the various policies. + */ + private int dowait(boolean timed, long nanos) + throws InterruptedException, BrokenBarrierException, TimeoutException { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int index = --count; + long g = generation; + + if (broken) + throw new BrokenBarrierException(); + + if (Thread.interrupted()) { + breakBarrier(); + throw new InterruptedException(); + } + + if (index == 0) { // tripped + nextGeneration(); + boolean ranAction = false; + try { + Runnable command = barrierCommand; + if (command != null) + command.run(); + ranAction = true; + return 0; + } finally { + if (!ranAction) + breakBarrier(); + } + } + + for (;;) { + try { + if (!timed) + trip.await(); + else if (nanos > 0L) + nanos = trip.awaitNanos(nanos); + } catch (InterruptedException ie) { + breakBarrier(); + throw ie; + } + + if (broken || + g > generation) // true if a reset occurred while waiting + throw new BrokenBarrierException(); + + if (g < generation) + return index; + + if (timed && nanos <= 0L) { + breakBarrier(); + throw new TimeoutException(); + } + } + + } finally { + lock.unlock(); + } + } + + /** + * Creates a new CyclicBarrier that will trip when the + * given number of parties (threads) are waiting upon it, and which + * will execute the given barrier action when the barrier is tripped, + * performed by the last thread entering the barrier. + * + * @param parties the number of threads that must invoke {@link #await} + * before the barrier is tripped. + * @param barrierAction the command to execute when the barrier is + * tripped, or null if there is no action. + * + * @throws IllegalArgumentException if parties is less than 1. + */ + public CyclicBarrier(int parties, Runnable barrierAction) { + if (parties <= 0) throw new IllegalArgumentException(); + this.parties = parties; + this.count = parties; + this.barrierCommand = barrierAction; + } + + /** + * Creates a new CyclicBarrier that will trip when the + * given number of parties (threads) are waiting upon it, and + * does not perform a predefined action upon each barrier. + * + * @param parties the number of threads that must invoke {@link #await} + * before the barrier is tripped. + * + * @throws IllegalArgumentException if parties is less than 1. + */ + public CyclicBarrier(int parties) { + this(parties, null); + } + + /** + * Returns the number of parties required to trip this barrier. + * @return the number of parties required to trip this barrier. + **/ + public int getParties() { + return parties; + } + + /** + * Waits until all {@link #getParties parties} have invoked await + * on this barrier. + * + *

If the current thread is not the last to arrive then it is + * disabled for thread scheduling purposes and lies dormant until + * one of following things happens: + *

    + *
  • The last thread arrives; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread; or + *
  • Some other thread {@link Thread#interrupt interrupts} one of the + * other waiting threads; or + *
  • Some other thread times out while waiting for barrier; or + *
  • Some other thread invokes {@link #reset} on this barrier. + *
+ *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the barrier is {@link #reset} while any thread is waiting, or if + * the barrier {@link #isBroken is broken} when await is invoked, + * or while any thread is waiting, + * then {@link BrokenBarrierException} is thrown. + * + *

If any thread is {@link Thread#interrupt interrupted} while waiting, + * then all other waiting threads will throw + * {@link BrokenBarrierException} and the barrier is placed in the broken + * state. + * + *

If the current thread is the last thread to arrive, and a + * non-null barrier action was supplied in the constructor, then the + * current thread runs the action before allowing the other threads to + * continue. + * If an exception occurs during the barrier action then that exception + * will be propagated in the current thread and the barrier is placed in + * the broken state. + * + * @return the arrival index of the current thread, where index + * {@link #getParties()} - 1 indicates the first to arrive and + * zero indicates the last to arrive. + * + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws BrokenBarrierException if another thread was + * interrupted while the current thread was waiting, or the barrier was + * reset, or the barrier was broken when await was called, + * or the barrier action (if present) failed due an exception. + */ + public int await() throws InterruptedException, BrokenBarrierException { + try { + return dowait(false, 0L); + } catch (TimeoutException toe) { + throw new Error(toe); // cannot happen; + } + } + + /** + * Waits until all {@link #getParties parties} have invoked await + * on this barrier. + * + *

If the current thread is not the last to arrive then it is + * disabled for thread scheduling purposes and lies dormant until + * one of the following things happens: + *

    + *
  • The last thread arrives; or + *
  • The specified timeout elapses; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread; or + *
  • Some other thread {@link Thread#interrupt interrupts} one of the + * other waiting threads; or + *
  • Some other thread times out while waiting for barrier; or + *
  • Some other thread invokes {@link #reset} on this barrier. + *
+ *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the barrier is {@link #reset} while any thread is waiting, or if + * the barrier {@link #isBroken is broken} when await is invoked, + * or while any thread is waiting, + * then {@link BrokenBarrierException} is thrown. + * + *

If any thread is {@link Thread#interrupt interrupted} while waiting, + * then all other waiting threads will throw + * {@link BrokenBarrierException} and the barrier is placed in the broken + * state. + * + *

If the current thread is the last thread to arrive, and a + * non-null barrier action was supplied in the constructor, then the + * current thread runs the action before allowing the other threads to + * continue. + * If an exception occurs during the barrier action then that exception + * will be propagated in the current thread and the barrier is placed in + * the broken state. + * + * @param timeout the time to wait for the barrier + * @param unit the time unit of the timeout parameter + * @return the arrival index of the current thread, where index + * {@link #getParties()} - 1 indicates the first to arrive and + * zero indicates the last to arrive. + * + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws TimeoutException if the specified timeout elapses. + * @throws BrokenBarrierException if another thread was + * interrupted while the current thread was waiting, or the barrier was + * reset, or the barrier was broken when await was called, + * or the barrier action (if present) failed due an exception. + */ + public int await(long timeout, TimeUnit unit) + throws InterruptedException, + BrokenBarrierException, + TimeoutException { + return dowait(true, unit.toNanos(timeout)); + } + + /** + * Queries if this barrier is in a broken state. + * @return true if one or more parties broke out of this + * barrier due to interruption or timeout since construction or + * the last reset, or a barrier action failed due to an exception; + * and false otherwise. + */ + public boolean isBroken() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return broken; + } finally { + lock.unlock(); + } + } + + /** + * Resets the barrier to its initial state. If any parties are + * currently waiting at the barrier, they will return with a + * {@link BrokenBarrierException}. Note that resets after + * a breakage has occurred for other reasons can be complicated to + * carry out; threads need to re-synchronize in some other way, + * and choose one to perform the reset. It may be preferable to + * instead create a new barrier for subsequent use. + */ + public void reset() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + /* + * Retract generation number enough to cover threads + * currently waiting on current and still resuming from + * previous generation, plus similarly accommodating spans + * after the reset. + */ + generation -= 4; + broken = false; + trip.signalAll(); + } finally { + lock.unlock(); + } + } + + /** + * Returns the number of parties currently waiting at the barrier. + * This method is primarily useful for debugging and assertions. + * + * @return the number of parties currently blocked in {@link #await} + **/ + public int getNumberWaiting() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return parties - count; + } finally { + lock.unlock(); + } + } + +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/CyclicBarrier.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,366 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + + +package java.util.concurrent; +import java.util.concurrent.locks.*; +import java.util.*; + +/** + * An unbounded {@linkplain BlockingQueue blocking queue} of Delayed + * elements, in which an element can only be taken when its delay has expired. + * The head of the queue is that Delayed element whose delay + * expired furthest in the past - if no delay has expired there is no head and + * poll will return null. + * This queue does not permit null elements. + *

This class implements all of the optional methods + * of the {@link Collection} and {@link Iterator} interfaces. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.5 + * @author Doug Lea + * @param the type of elements held in this collection + */ + +public class DelayQueue extends AbstractQueue + implements BlockingQueue { + + private transient final ReentrantLock lock = new ReentrantLock(); + private transient final Condition available = lock.newCondition(); + private final PriorityQueue q = new PriorityQueue(); + + /** + * Creates a new DelayQueue that is initially empty. + */ + public DelayQueue() {} + + /** + * Creates a DelayQueue initially containing the elements of the + * given collection of {@link Delayed} instances. + * + * @param c the collection + * @throws NullPointerException if c or any element within it + * is null + * + */ + public DelayQueue(Collection c) { + this.addAll(c); + } + + /** + * Inserts the specified element into this delay queue. + * + * @param o the element to add + * @return true + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + q.offer(o); + if (first == null || o.compareTo(first) < 0) + available.signalAll(); + return true; + } finally { + lock.unlock(); + } + } + + + /** + * Adds the specified element to this delay queue. As the queue is + * unbounded this method will never block. + * @param o the element to add + * @throws NullPointerException if the specified element is null. + */ + public void put(E o) { + offer(o); + } + + /** + * Inserts the specified element into this delay queue. As the queue is + * unbounded this method will never block. + * @param o the element to add + * @param timeout This parameter is ignored as the method never blocks + * @param unit This parameter is ignored as the method never blocks + * @return true + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o, long timeout, TimeUnit unit) { + return offer(o); + } + + /** + * Adds the specified element to this queue. + * @param o the element to add + * @return true (as per the general contract of + * Collection.add). + * + * @throws NullPointerException if the specified element is null. + */ + public boolean add(E o) { + return offer(o); + } + + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) { + available.await(); + } else { + long delay = first.getDelay(TimeUnit.NANOSECONDS); + if (delay > 0) { + long tl = available.awaitNanos(delay); + } else { + E x = q.poll(); + assert x != null; + if (q.size() != 0) + available.signalAll(); // wake up other takers + return x; + + } + } + } + } finally { + lock.unlock(); + } + } + + public E poll(long time, TimeUnit unit) throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + long nanos = unit.toNanos(time); + try { + for (;;) { + E first = q.peek(); + if (first == null) { + if (nanos <= 0) + return null; + else + nanos = available.awaitNanos(nanos); + } else { + long delay = first.getDelay(TimeUnit.NANOSECONDS); + if (delay > 0) { + if (delay > nanos) + delay = nanos; + long timeLeft = available.awaitNanos(delay); + nanos -= delay - timeLeft; + } else { + E x = q.poll(); + assert x != null; + if (q.size() != 0) + available.signalAll(); + return x; + } + } + } + } finally { + lock.unlock(); + } + } + + + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) + return null; + else { + E x = q.poll(); + assert x != null; + if (q.size() != 0) + available.signalAll(); + return x; + } + } finally { + lock.unlock(); + } + } + + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.peek(); + } finally { + lock.unlock(); + } + } + + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.size(); + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int n = 0; + for (;;) { + E first = q.peek(); + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) + break; + c.add(q.poll()); + ++n; + } + if (n > 0) + available.signalAll(); + return n; + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int n = 0; + while (n < maxElements) { + E first = q.peek(); + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) + break; + c.add(q.poll()); + ++n; + } + if (n > 0) + available.signalAll(); + return n; + } finally { + lock.unlock(); + } + } + + /** + * Atomically removes all of the elements from this delay queue. + * The queue will be empty after this call returns. + */ + public void clear() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + q.clear(); + } finally { + lock.unlock(); + } + } + + /** + * Always returns Integer.MAX_VALUE because + * a DelayQueue is not capacity constrained. + * @return Integer.MAX_VALUE + */ + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + public Object[] toArray() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toArray(); + } finally { + lock.unlock(); + } + } + + public T[] toArray(T[] array) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toArray(array); + } finally { + lock.unlock(); + } + } + + public boolean remove(Object o) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.remove(o); + } finally { + lock.unlock(); + } + } + + /** + * Returns an iterator over the elements in this queue. The iterator + * does not return the elements in any particular order. The + * returned iterator is a thread-safe "fast-fail" iterator that will + * throw {@link java.util.ConcurrentModificationException} + * upon detected interference. + * + * @return an iterator over the elements in this queue. + */ + public Iterator iterator() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return new Itr(q.iterator()); + } finally { + lock.unlock(); + } + } + + private class Itr implements Iterator { + private final Iterator iter; + Itr(Iterator i) { + iter = i; + } + + public boolean hasNext() { + return iter.hasNext(); + } + + public E next() { + final ReentrantLock lock = DelayQueue.this.lock; + lock.lock(); + try { + return iter.next(); + } finally { + lock.unlock(); + } + } + + public void remove() { + final ReentrantLock lock = DelayQueue.this.lock; + lock.lock(); + try { + iter.remove(); + } finally { + lock.unlock(); + } + } + } + +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/DelayQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,32 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +import java.util.*; + +/** + * A mix-in style interface for marking objects that should be + * acted upon after a given delay. + * + *

An implementation of this interface must define a + * compareTo method that provides an ordering consistent with + * its getDelay method. + * + * @since 1.5 + * @author Doug Lea + */ +public interface Delayed extends Comparable { + + /** + * Returns the delay associated with this object, in the given time unit. + * + * @param unit the time unit + * @return the delay; zero or negative values indicate that the + * delay has already elapsed + */ + long getDelay(TimeUnit unit); +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Delayed.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,247 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.locks.*; + +/** + * A synchronization point at which two threads can exchange objects. + * Each thread presents some object on entry to the {@link #exchange + * exchange} method, and receives the object presented by the other + * thread on return. + * + *

Sample Usage: + * Here are the highlights of a class that uses an Exchanger to + * swap buffers between threads so that the thread filling the + * buffer gets a freshly + * emptied one when it needs it, handing off the filled one to + * the thread emptying the buffer. + *

+ * class FillAndEmpty {
+ *   Exchanger<DataBuffer> exchanger = new Exchanger();
+ *   DataBuffer initialEmptyBuffer = ... a made-up type
+ *   DataBuffer initialFullBuffer = ...
+ *
+ *   class FillingLoop implements Runnable {
+ *     public void run() {
+ *       DataBuffer currentBuffer = initialEmptyBuffer;
+ *       try {
+ *         while (currentBuffer != null) {
+ *           addToBuffer(currentBuffer);
+ *           if (currentBuffer.full())
+ *             currentBuffer = exchanger.exchange(currentBuffer);
+ *         }
+ *       } catch (InterruptedException ex) { ... handle ... }
+ *     }
+ *   }
+ *
+ *   class EmptyingLoop implements Runnable {
+ *     public void run() {
+ *       DataBuffer currentBuffer = initialFullBuffer;
+ *       try {
+ *         while (currentBuffer != null) {
+ *           takeFromBuffer(currentBuffer);
+ *           if (currentBuffer.empty())
+ *             currentBuffer = exchanger.exchange(currentBuffer);
+ *         }
+ *       } catch (InterruptedException ex) { ... handle ...}
+ *     }
+ *   }
+ *
+ *   void start() {
+ *     new Thread(new FillingLoop()).start();
+ *     new Thread(new EmptyingLoop()).start();
+ *   }
+ * }
+ * 
+ * + * @since 1.5 + * @author Doug Lea + * @param The type of objects that may be exchanged + */ +public class Exchanger { + private final ReentrantLock lock = new ReentrantLock(); + private final Condition taken = lock.newCondition(); + + /** Holder for the item being exchanged */ + private V item; + + /** + * Arrival count transitions from 0 to 1 to 2 then back to 0 + * during an exchange. + */ + private int arrivalCount; + + /** + * Main exchange function, handling the different policy variants. + */ + private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException { + lock.lock(); + try { + V other; + + // If arrival count already at two, we must wait for + // a previous pair to finish and reset the count; + while (arrivalCount == 2) { + if (!timed) + taken.await(); + else if (nanos > 0) + nanos = taken.awaitNanos(nanos); + else + throw new TimeoutException(); + } + + int count = ++arrivalCount; + + // If item is already waiting, replace it and signal other thread + if (count == 2) { + other = item; + item = x; + taken.signal(); + return other; + } + + // Otherwise, set item and wait for another thread to + // replace it and signal us. + + item = x; + InterruptedException interrupted = null; + try { + while (arrivalCount != 2) { + if (!timed) + taken.await(); + else if (nanos > 0) + nanos = taken.awaitNanos(nanos); + else + break; // timed out + } + } catch (InterruptedException ie) { + interrupted = ie; + } + + // Get and reset item and count after the wait. + // (We need to do this even if wait was aborted.) + other = item; + item = null; + count = arrivalCount; + arrivalCount = 0; + taken.signal(); + + // If the other thread replaced item, then we must + // continue even if cancelled. + if (count == 2) { + if (interrupted != null) + Thread.currentThread().interrupt(); + return other; + } + + // If no one is waiting for us, we can back out + if (interrupted != null) + throw interrupted; + else // must be timeout + throw new TimeoutException(); + } finally { + lock.unlock(); + } + } + + /** + * Create a new Exchanger. + **/ + public Exchanger() { + } + + /** + * Waits for another thread to arrive at this exchange point (unless + * it is {@link Thread#interrupt interrupted}), + * and then transfers the given object to it, receiving its object + * in return. + *

If another thread is already waiting at the exchange point then + * it is resumed for thread scheduling purposes and receives the object + * passed in by the current thread. The current thread returns immediately, + * receiving the object passed to the exchange by that other thread. + *

If no other thread is already waiting at the exchange then the + * current thread is disabled for thread scheduling purposes and lies + * dormant until one of two things happens: + *

    + *
  • Some other thread enters the exchange; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread. + *
+ *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting + * for the exchange, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * @param x the object to exchange + * @return the object provided by the other thread. + * @throws InterruptedException if current thread was interrupted + * while waiting + **/ + public V exchange(V x) throws InterruptedException { + try { + return doExchange(x, false, 0); + } catch (TimeoutException cannotHappen) { + throw new Error(cannotHappen); + } + } + + /** + * Waits for another thread to arrive at this exchange point (unless + * it is {@link Thread#interrupt interrupted}, or the specified waiting + * time elapses), + * and then transfers the given object to it, receiving its object + * in return. + * + *

If another thread is already waiting at the exchange point then + * it is resumed for thread scheduling purposes and receives the object + * passed in by the current thread. The current thread returns immediately, + * receiving the object passed to the exchange by that other thread. + * + *

If no other thread is already waiting at the exchange then the + * current thread is disabled for thread scheduling purposes and lies + * dormant until one of three things happens: + *

    + *
  • Some other thread enters the exchange; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread; or + *
  • The specified waiting time elapses. + *
+ *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting + * for the exchange, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the specified waiting time elapses then {@link TimeoutException} + * is thrown. + * If the time is + * less than or equal to zero, the method will not wait at all. + * + * @param x the object to exchange + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument. + * @return the object provided by the other thread. + * @throws InterruptedException if current thread was interrupted + * while waiting + * @throws TimeoutException if the specified waiting time elapses before + * another thread enters the exchange. + **/ + public V exchange(V x, long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException { + return doExchange(x, true, unit.toNanos(timeout)); + } + +} + + Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,65 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * Exception thrown when attempting to retrieve the result of a task + * that aborted by throwing an exception. This exception can be + * inspected using the {@link #getCause()} method. + * + * @see Future + * @since 1.5 + * @author Doug Lea + */ +public class ExecutionException extends Exception { + private static final long serialVersionUID = 7830266012832686185L; + + /** + * Constructs a ExecutionException with no detail message. + * The cause is not initialized, and may subsequently be + * initialized by a call to {@link #initCause(Throwable) initCause}. + */ + protected ExecutionException() { } + + /** + * Constructs a ExecutionException with the specified detail + * message. The cause is not initialized, and may subsequently be + * initialized by a call to {@link #initCause(Throwable) initCause}. + * + * @param message the detail message + */ + protected ExecutionException(String message) { + super(message); + } + + /** + * Constructs a ExecutionException with the specified detail + * message and cause. + * + * @param message the detail message + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public ExecutionException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a ExecutionException with the specified cause. + * The detail message is set to: + *

+     *  (cause == null ? null : cause.toString())
+ * (which typically contains the class and detail message of + * cause). + * + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public ExecutionException(Throwable cause) { + super(cause); + } +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,107 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * An object that executes submitted {@link Runnable} tasks. This + * interface provides a way of decoupling task submission from the + * mechanics of how each task will be run, including details of thread + * use, scheduling, etc. An Executor is normally used + * instead of explicitly creating threads. For example, rather than + * invoking new Thread(new(RunnableTask())).start() for each + * of a set of tasks, you might use: + * + *
+ * Executor executor = anExecutor;
+ * executor.execute(new RunnableTask1());
+ * executor.execute(new RunnableTask2());
+ * ...
+ * 
+ * + * However, the Executor interface does not strictly + * require that execution be asynchronous. In the simplest case, an + * executor can run the submitted task immediately in the caller's + * thread: + * + *
+ * class DirectExecutor implements Executor {
+ *     public void execute(Runnable r) {
+ *         r.run();
+ *     }
+ * }
+ * + * More typically, tasks are executed in some thread other + * than the caller's thread. The executor below spawns a new thread + * for each task. + * + *
+ * class ThreadPerTaskExecutor implements Executor {
+ *     public void execute(Runnable r) {
+ *         new Thread(r).start();
+ *     }
+ * }
+ * + * Many Executor implementations impose some sort of + * limitation on how and when tasks are scheduled. The executor below + * serializes the submission of tasks to a second executor, + * illustrating a composite executor. + * + *
+ * class SerialExecutor implements Executor {
+ *     final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
+ *     final Executor executor;
+ *     Runnable active;
+ *
+ *     SerialExecutor(Executor executor) {
+ *         this.executor = executor;
+ *     }
+ *
+ *     public synchronized void execute(final Runnable r) {
+ *         tasks.offer(new Runnable() {
+ *             public void run() {
+ *                 try {
+ *                     r.run();
+ *                 } finally {
+ *                     scheduleNext();
+ *                 }
+ *             }
+ *         });
+ *         if (active == null) {
+ *             scheduleNext();
+ *         }
+ *     }
+ *
+ *     protected synchronized void scheduleNext() {
+ *         if ((active = tasks.poll()) != null) {
+ *             executor.execute(active);
+ *         }
+ *     }
+ * }
+ * + * The Executor implementations provided in this package + * implement {@link ExecutorService}, which is a more extensive + * interface. The {@link ThreadPoolExecutor} class provides an + * extensible thread pool implementation. The {@link Executors} class + * provides convenient factory methods for these Executors. + * + * @since 1.5 + * @author Doug Lea + */ +public interface Executor { + + /** + * Executes the given command at some time in the future. The command + * may execute in a new thread, in a pooled thread, or in the calling + * thread, at the discretion of the Executor implementation. + * + * @param command the runnable task + * @throws RejectedExecutionException if this task cannot be + * accepted for execution. + * @throws NullPointerException if command is null + */ + void execute(Runnable command); +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,148 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + + +/** + * A {@link CompletionService} that uses a supplied {@link Executor} + * to execute tasks. This class arranges that submitted tasks are, + * upon completion, placed on a queue accessible using take. + * The class is lightweight enough to be suitable for transient use + * when processing groups of tasks. + * + *

+ * + * Usage Examples. + * + * Suppose you have a set of solvers for a certain problem, each + * returning a value of some type Result, and would like to + * run them concurrently, processing the results of each of them that + * return a non-null value, in some method use(Result r). You + * could write this as: + * + *

+ *    void solve(Executor e, Collection<Callable<Result>> solvers)
+ *      throws InterruptedException, ExecutionException {
+ *        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
+ *        for (Callable<Result> s : solvers)
+ *            ecs.submit(s);
+ *        int n = solvers.size();
+ *        for (int i = 0; i < n; ++i) {
+ *            Result r = ecs.take().get();
+ *            if (r != null) 
+ *                use(r);
+ *        }
+ *    }
+ * 
+ * + * Suppose instead that you would like to use the first non-null result + * of the set of tasks, ignoring any that encounter exceptions, + * and cancelling all other tasks when the first one is ready: + * + *
+ *    void solve(Executor e, Collection<Callable<Result>> solvers) 
+ *      throws InterruptedException {
+ *        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
+ *        int n = solvers.size();
+ *        List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
+ *        Result result = null;
+ *        try {
+ *            for (Callable<Result> s : solvers)
+ *                futures.add(ecs.submit(s));
+ *            for (int i = 0; i < n; ++i) {
+ *                try {
+ *                    Result r = ecs.take().get();
+ *                    if (r != null) {
+ *                        result = r;
+ *                        break;
+ *                    }
+ *                } catch(ExecutionException ignore) {}
+ *            }
+ *        }
+ *        finally {
+ *            for (Future<Result> f : futures)
+ *                f.cancel(true);
+ *        }
+ *
+ *        if (result != null)
+ *            use(result);
+ *    }
+ * 
+ */ +public class ExecutorCompletionService implements CompletionService { + private final Executor executor; + private final BlockingQueue> completionQueue; + + /** + * FutureTask extension to enqueue upon completion + */ + private class QueueingFuture extends FutureTask { + QueueingFuture(Callable c) { super(c); } + QueueingFuture(Runnable t, V r) { super(t, r); } + protected void done() { completionQueue.add(this); } + } + + /** + * Creates an ExecutorCompletionService using the supplied + * executor for base task execution and a + * {@link LinkedBlockingQueue} as a completion queue. + * @param executor the executor to use + * @throws NullPointerException if executor is null + */ + public ExecutorCompletionService(Executor executor) { + if (executor == null) + throw new NullPointerException(); + this.executor = executor; + this.completionQueue = new LinkedBlockingQueue>(); + } + + /** + * Creates an ExecutorCompletionService using the supplied + * executor for base task execution and the supplied queue as its + * completion queue. + * @param executor the executor to use + * @param completionQueue the queue to use as the completion queue + * normally one dedicated for use by this service + * @throws NullPointerException if executor or completionQueue are null + */ + public ExecutorCompletionService(Executor executor, + BlockingQueue> completionQueue) { + if (executor == null || completionQueue == null) + throw new NullPointerException(); + this.executor = executor; + this.completionQueue = completionQueue; + } + + public Future submit(Callable task) { + if (task == null) throw new NullPointerException(); + QueueingFuture f = new QueueingFuture(task); + executor.execute(f); + return f; + } + + public Future submit(Runnable task, V result) { + if (task == null) throw new NullPointerException(); + QueueingFuture f = new QueueingFuture(task, result); + executor.execute(f); + return f; + } + + public Future take() throws InterruptedException { + return completionQueue.take(); + } + + public Future poll() { + return completionQueue.poll(); + } + + public Future poll(long timeout, TimeUnit unit) throws InterruptedException { + return completionQueue.poll(timeout, unit); + } + +} + + Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,286 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +import java.util.List; +import java.util.Collection; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + +/** + * An {@link Executor} that provides methods to manage termination and + * methods that can produce a {@link Future} for tracking progress of + * one or more asynchronous tasks. + * + *

+ * An ExecutorService can be shut down, which will cause it + * to stop accepting new tasks. After being shut down, the executor + * will eventually terminate, at which point no tasks are actively + * executing, no tasks are awaiting execution, and no new tasks can be + * submitted. + * + *

Method submit extends base method {@link + * Executor#execute} by creating and returning a {@link Future} that + * can be used to cancel execution and/or wait for completion. + * Methods invokeAny and invokeAll perform the most + * commonly useful forms of bulk execution, executing a collection of + * tasks and then waiting for at least one, or all, to + * complete. (Class {@link ExecutorCompletionService} can be used to + * write customized variants of these methods.) + * + *

The {@link Executors} class provides factory methods for the + * executor services provided in this package. + * + *

Usage Example

+ * + * Here is a sketch of a network service in which threads in a thread + * pool service incoming requests. It uses the preconfigured {@link + * Executors#newFixedThreadPool} factory method: + * + *
+ * class NetworkService {
+ *    private final ServerSocket serverSocket;
+ *    private final ExecutorService pool;
+ *
+ *    public NetworkService(int port, int poolSize) throws IOException {
+ *      serverSocket = new ServerSocket(port);
+ *      pool = Executors.newFixedThreadPool(poolSize);
+ *    }
+ * 
+ *    public void serve() {
+ *      try {
+ *        for (;;) {
+ *          pool.execute(new Handler(serverSocket.accept()));
+ *        }
+ *      } catch (IOException ex) {
+ *        pool.shutdown();
+ *      }
+ *    }
+ *  }
+ *
+ *  class Handler implements Runnable {
+ *    private final Socket socket;
+ *    Handler(Socket socket) { this.socket = socket; }
+ *    public void run() {
+ *      // read and service request
+ *    }
+ * }
+ * 
+ * @since 1.5 + * @author Doug Lea + */ +public interface ExecutorService extends Executor { + + /** + * Initiates an orderly shutdown in which previously submitted + * tasks are executed, but no new tasks will be + * accepted. Invocation has no additional effect if already shut + * down. + * @throws SecurityException if a security manager exists and + * shutting down this ExecutorService may manipulate threads that + * the caller is not permitted to modify because it does not hold + * {@link java.lang.RuntimePermission}("modifyThread"), + * or the security manager's checkAccess method denies access. + */ + void shutdown(); + + /** + * Attempts to stop all actively executing tasks, halts the + * processing of waiting tasks, and returns a list of the tasks that were + * awaiting execution. + * + *

There are no guarantees beyond best-effort attempts to stop + * processing actively executing tasks. For example, typical + * implementations will cancel via {@link Thread#interrupt}, so if any + * tasks mask or fail to respond to interrupts, they may never terminate. + * + * @return list of tasks that never commenced execution + * @throws SecurityException if a security manager exists and + * shutting down this ExecutorService may manipulate threads that + * the caller is not permitted to modify because it does not hold + * {@link java.lang.RuntimePermission}("modifyThread"), + * or the security manager's checkAccess method denies access. + */ + List shutdownNow(); + + /** + * Returns true if this executor has been shut down. + * + * @return true if this executor has been shut down + */ + boolean isShutdown(); + + /** + * Returns true if all tasks have completed following shut down. + * Note that isTerminated is never true unless + * either shutdown or shutdownNow was called first. + * + * @return true if all tasks have completed following shut down + */ + boolean isTerminated(); + + /** + * Blocks until all tasks have completed execution after a shutdown + * request, or the timeout occurs, or the current thread is + * interrupted, whichever happens first. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return true if this executor terminated and false + * if the timeout elapsed before termination + * @throws InterruptedException if interrupted while waiting + */ + boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException; + + + /** + * Submits a value-returning task for execution and returns a Future + * representing the pending results of the task. + * + *

+ * If you would like to immediately block waiting + * for a task, you can use constructions of the form + * result = exec.submit(aCallable).get(); + * + *

Note: The {@link Executors} class includes a set of methods + * that can convert some other common closure-like objects, + * for example, {@link java.security.PrivilegedAction} to + * {@link Callable} form so they can be submitted. + * + * @param task the task to submit + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if task cannot be scheduled + * for execution + * @throws NullPointerException if task null + */ + Future submit(Callable task); + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task that will upon completion return + * the given result + * + * @param task the task to submit + * @param result the result to return + * @return a Future representing pending completion of the task, + * and whose get() method will return the given result + * upon completion. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution + * @throws NullPointerException if task null + */ + Future submit(Runnable task, T result); + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. + * + * @param task the task to submit + * @return a Future representing pending completion of the task, + * and whose get() method will return null + * upon completion. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution + * @throws NullPointerException if task null + */ + Future submit(Runnable task); + + /** + * Executes the given tasks, returning their results + * when all complete. + * Note that a completed task could have + * terminated either normally or by throwing an exception. + * The results of this method are undefined if the given + * collection is modified while this operation is in progress. + * @param tasks the collection of tasks + * @return A list of Futures representing the tasks, in the same + * sequential order as produced by the iterator for the given task + * list, each of which has completed. + * @throws InterruptedException if interrupted while waiting, in + * which case unfinished tasks are cancelled. + * @throws NullPointerException if tasks or any of its elements are null + * @throws RejectedExecutionException if any task cannot be scheduled + * for execution + */ + + List> invokeAll(Collection> tasks) + throws InterruptedException; + + /** + * Executes the given tasks, returning their results + * when all complete or the timeout expires, whichever happens first. + * Upon return, tasks that have not completed are cancelled. + * Note that a completed task could have + * terminated either normally or by throwing an exception. + * The results of this method are undefined if the given + * collection is modified while this operation is in progress. + * @param tasks the collection of tasks + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return A list of Futures representing the tasks, in the same + * sequential order as produced by the iterator for the given + * task list. If the operation did not time out, each task will + * have completed. If it did time out, some of thiese tasks will + * not have completed. + * @throws InterruptedException if interrupted while waiting, in + * which case unfinished tasks are cancelled. + * @throws NullPointerException if tasks, any of its elements, or + * unit are null + * @throws RejectedExecutionException if any task cannot be scheduled + * for execution + */ + List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException; + + /** + * Executes the given tasks, returning the result + * of one that has completed successfully (i.e., without throwing + * an exception), if any do. Upon normal or exceptional return, + * tasks that have not completed are cancelled. + * The results of this method are undefined if the given + * collection is modified while this operation is in progress. + * @param tasks the collection of tasks + * @return The result returned by one of the tasks. + * @throws InterruptedException if interrupted while waiting + * @throws NullPointerException if tasks or any of its elements + * are null + * @throws IllegalArgumentException if tasks empty + * @throws ExecutionException if no task successfully completes + * @throws RejectedExecutionException if tasks cannot be scheduled + * for execution + */ + T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException; + + /** + * Executes the given tasks, returning the result + * of one that has completed successfully (i.e., without throwing + * an exception), if any do before the given timeout elapses. + * Upon normal or exceptional return, tasks that have not + * completed are cancelled. + * The results of this method are undefined if the given + * collection is modified while this operation is in progress. + * @param tasks the collection of tasks + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return The result returned by one of the tasks. + * @throws InterruptedException if interrupted while waiting + * @throws NullPointerException if tasks, any of its elements, or + * unit are null + * @throws TimeoutException if the given timeout elapses before + * any task successfully completes + * @throws ExecutionException if no task successfully completes + * @throws RejectedExecutionException if tasks cannot be scheduled + * for execution + */ + T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; + +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java ------------------------------------------------------------------------------ svn:eol-style = native