harmony-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hinde...@apache.org
Subject svn commit: r798469 [9/28] - in /harmony/enhanced/classlib/branches/java6: ./ depends/build/platform/ depends/files/ depends/jars/ depends/manifests/icu4j_4.0/ depends/manifests/icu4j_4.2.1/ depends/manifests/icu4j_4.2.1/META-INF/ make/ modules/accessi...
Date Tue, 28 Jul 2009 09:30:48 GMT
Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java Tue Jul 28 09:30:33 2009
@@ -1,27 +1,31 @@
 /*
- * Written by Doug Lea with assistance from members of JCP JSR-166
- * Expert Group and released to the public domain, as explained at
+ * Written by Doug Lea, Bill Scherer, and Michael Scott with
+ * assistance from members of JCP JSR-166 Expert Group and released to
+ * the public domain, as explained at
  * http://creativecommons.org/licenses/publicdomain
  */
 
 package java.util.concurrent;
-import java.util.concurrent.locks.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.LockSupport;
 
 /**
- * A synchronization point at which two threads can exchange objects.
- * Each thread presents some object on entry to the {@link #exchange
- * exchange} method, and receives the object presented by the other
- * thread on return.
+ * A synchronization point at which threads can pair and swap elements
+ * within pairs.  Each thread presents some object on entry to the
+ * {@link #exchange exchange} method, matches with a partner thread,
+ * and receives its partner's object on return.  An Exchanger may be
+ * viewed as a bidirectional form of a {@link SynchronousQueue}.
+ * Exchangers may be useful in applications such as genetic algorithms
+ * and pipeline designs.
  *
  * <p><b>Sample Usage:</b>
- * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
- * swap buffers between threads so that the thread filling the
- * buffer gets a freshly
- * emptied one when it needs it, handing off the filled one to
- * the thread emptying the buffer.
- * <pre>
+ * Here are the highlights of a class that uses an {@code Exchanger}
+ * to swap buffers between threads so that the thread filling the
+ * buffer gets a freshly emptied one when it needs it, handing off the
+ * filled one to the thread emptying the buffer.
+ * <pre>{@code
  * class FillAndEmpty {
- *   Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
+ *   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
  *   DataBuffer initialEmptyBuffer = ... a made-up type
  *   DataBuffer initialFullBuffer = ...
  *
@@ -31,7 +35,7 @@
  *       try {
  *         while (currentBuffer != null) {
  *           addToBuffer(currentBuffer);
- *           if (currentBuffer.full())
+ *           if (currentBuffer.isFull())
  *             currentBuffer = exchanger.exchange(currentBuffer);
  *         }
  *       } catch (InterruptedException ex) { ... handle ... }
@@ -44,7 +48,7 @@
  *       try {
  *         while (currentBuffer != null) {
  *           takeFromBuffer(currentBuffer);
- *           if (currentBuffer.empty())
+ *           if (currentBuffer.isEmpty())
  *             currentBuffer = exchanger.exchange(currentBuffer);
  *         }
  *       } catch (InterruptedException ex) { ... handle ...}
@@ -56,192 +60,597 @@
  *     new Thread(new EmptyingLoop()).start();
  *   }
  * }
- * </pre>
+ * }</pre>
+ *
+ * <p>Memory consistency effects: For each pair of threads that
+ * successfully exchange objects via an {@code Exchanger}, actions
+ * prior to the {@code exchange()} in each thread
+ * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
+ * those subsequent to a return from the corresponding {@code exchange()}
+ * in the other thread.
  *
  * @since 1.5
- * @author Doug Lea
+ * @author Doug Lea and Bill Scherer and Michael Scott
  * @param <V> The type of objects that may be exchanged
  */
 public class Exchanger<V> {
-    private final ReentrantLock lock = new ReentrantLock();
-    private final Condition taken = lock.newCondition();
+    /*
+     * Algorithm Description:
+     *
+     * The basic idea is to maintain a "slot", which is a reference to
+     * a Node containing both an Item to offer and a "hole" waiting to
+     * get filled in.  If an incoming "occupying" thread sees that the
+     * slot is null, it CAS'es (compareAndSets) a Node there and waits
+     * for another to invoke exchange.  That second "fulfilling" thread
+     * sees that the slot is non-null, and so CASes it back to null,
+     * also exchanging items by CASing the hole, plus waking up the
+     * occupying thread if it is blocked.  In each case CAS'es may
+     * fail because a slot at first appears non-null but is null upon
+     * CAS, or vice-versa.  So threads may need to retry these
+     * actions.
+     *
+     * This simple approach works great when there are only a few
+     * threads using an Exchanger, but performance rapidly
+     * deteriorates due to CAS contention on the single slot when
+     * there are lots of threads using an exchanger.  So instead we use
+     * an "arena"; basically a kind of hash table with a dynamically
+     * varying number of slots, any one of which can be used by
+     * threads performing an exchange.  Incoming threads pick slots
+     * based on a hash of their Thread ids.  If an incoming thread
+     * fails to CAS in its chosen slot, it picks an alternative slot
+     * instead.  And similarly from there.  If a thread successfully
+     * CASes into a slot but no other thread arrives, it tries
+     * another, heading toward the zero slot, which always exists even
+     * if the table shrinks.  The particular mechanics controlling this
+     * are as follows:
+     *
+     * Waiting: Slot zero is special in that it is the only slot that
+     * exists when there is no contention.  A thread occupying slot
+     * zero will block if no thread fulfills it after a short spin.
+     * In other cases, occupying threads eventually give up and try
+     * another slot.  Waiting threads spin for a while (a period that
+     * should be a little less than a typical context-switch time)
+     * before either blocking (if slot zero) or giving up (if other
+     * slots) and restarting.  There is no reason for threads to block
+     * unless there are unlikely to be any other threads present.
+     * Occupants are mainly avoiding memory contention so sit there
+     * quietly polling for a shorter period than it would take to
+     * block and then unblock them.  Non-slot-zero waits that elapse
+     * because of lack of other threads waste around one extra
+     * context-switch time per try, which is still on average much
+     * faster than alternative approaches.
+     *
+     * Sizing: Usually, using only a few slots suffices to reduce
+     * contention.  Especially with small numbers of threads, using
+     * too many slots can lead to just as poor performance as using
+     * too few of them, and there's not much room for error.  The
+     * variable "max" maintains the number of slots actually in
+     * use.  It is increased when a thread sees too many CAS
+     * failures.  (This is analogous to resizing a regular hash table
+     * based on a target load factor, except here, growth steps are
+     * just one-by-one rather than proportional.)  Growth requires
+     * contention failures in each of three tried slots.  Requiring
+     * multiple failures for expansion copes with the fact that some
+     * failed CASes are not due to contention but instead to simple
+     * races between two threads or thread pre-emptions occurring
+     * between reading and CASing.  Also, very transient peak
+     * contention can be much higher than the average sustainable
+     * levels.  The max limit is decreased on average 50% of the times
+     * that a non-slot-zero wait elapses without being fulfilled.
+     * Threads experiencing elapsed waits move closer to zero, so
+     * eventually find existing (or future) threads even if the table
+     * has been shrunk due to inactivity.  The chosen mechanics and
+     * thresholds for growing and shrinking are intrinsically
+     * entangled with indexing and hashing inside the exchange code,
+     * and can't be nicely abstracted out.
+     *
+     * Hashing: Each thread picks its initial slot to use in accord
+     * with a simple hashcode.  The sequence is the same on each
+     * encounter by any given thread, but effectively random across
+     * threads.  Using arenas encounters the classic cost vs quality
+     * tradeoffs of all hash tables.  Here, we use a one-step FNV-1a
+     * hash code based on the current thread's Thread.getId(), along
+     * with a cheap approximation to a mod operation to select an
+     * index.  The downside of optimizing index selection in this way
+     * is that the code is hardwired to use a maximum table size of
+     * 32.  But this value more than suffices for known platforms and
+     * applications.
+     *
+     * Probing: On sensed contention of a selected slot, we probe
+     * sequentially through the table, analogously to linear probing
+     * after collision in a hash table.  (We move circularly, in
+     * reverse order, to mesh best with table growth and shrinkage
+     * rules.)  Except that to minimize the effects of false-alarms
+     * and cache thrashing, we try the first selected slot twice
+     * before moving.
+     *
+     * Padding: Even with contention management, slots are heavily
+     * contended, so use cache-padding to avoid poor memory
+     * performance.  Because of this, slots are lazily constructed
+     * only when used, to avoid wasting this space unnecessarily.
+     * While isolation of locations is not much of an issue at first
+     * in an application, as time goes on and garbage-collectors
+     * perform compaction, slots are very likely to be moved adjacent
+     * to each other, which can cause much thrashing of cache lines on
+     * MPs unless padding is employed.
+     *
+     * This is an improvement of the algorithm described in the paper
+     * "A Scalable Elimination-based Exchange Channel" by William
+     * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05
+     * workshop.  Available at: http://hdl.handle.net/1802/2104
+     */
+
+    /** The number of CPUs, for sizing and spin control */
+    private static final int NCPU = Runtime.getRuntime().availableProcessors();
+
+    /**
+     * The capacity of the arena.  Set to a value that provides more
+     * than enough space to handle contention.  On small machines
+     * most slots won't be used, but it is still not wasted because
+     * the extra space provides some machine-level address padding
+     * to minimize interference with heavily CAS'ed Slot locations.
+     * And on very large machines, performance eventually becomes
+     * bounded by memory bandwidth, not numbers of threads/CPUs.
+     * This constant cannot be changed without also modifying
+     * indexing and hashing algorithms.
+     */
+    private static final int CAPACITY = 32;
+
+    /**
+     * The value of "max" that will hold all threads without
+     * contention.  When this value is less than CAPACITY, some
+     * otherwise wasted expansion can be avoided.
+     */
+    private static final int FULL =
+        Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
+
+    /**
+     * The number of times to spin (doing nothing except polling a
+     * memory location) before blocking or giving up while waiting to
+     * be fulfilled.  Should be zero on uniprocessors.  On
+     * multiprocessors, this value should be large enough so that two
+     * threads exchanging items as fast as possible block only when
+     * one of them is stalled (due to GC or preemption), but not much
+     * longer, to avoid wasting CPU resources.  Seen differently, this
+     * value is a little over half the number of cycles of an average
+     * context switch time on most systems.  The value here is
+     * approximately the average of those across a range of tested
+     * systems.
+     */
+    private static final int SPINS = (NCPU == 1) ? 0 : 2000;
+
+    /**
+     * The number of times to spin before blocking in timed waits.
+     * Timed waits spin more slowly because checking the time takes
+     * time.  The best value relies mainly on the relative rate of
+     * System.nanoTime vs memory accesses.  The value is empirically
+     * derived to work well across a variety of systems.
+     */
+    private static final int TIMED_SPINS = SPINS / 20;
+
+    /**
+     * Sentinel item representing cancellation of a wait due to
+     * interruption, timeout, or elapsed spin-waits.  This value is
+     * placed in holes on cancellation, and used as a return value
+     * from waiting methods to indicate failure to set or get hole.
+     */
+    private static final Object CANCEL = new Object();
+
+    /**
+     * Value representing null arguments/returns from public
+     * methods.  This disambiguates from internal requirement that
+     * holes start out as null to mean they are not yet set.
+     */
+    private static final Object NULL_ITEM = new Object();
+
+    /**
+     * Nodes hold partially exchanged data.  This class
+     * opportunistically subclasses AtomicReference to represent the
+     * hole.  So get() returns hole, and compareAndSet CAS'es value
+     * into hole.  This class cannot be parameterized as "V" because
+     * of the use of non-V CANCEL sentinels.
+     */
+    private static final class Node extends AtomicReference<Object> {
+        /** The element offered by the Thread creating this node. */
+        public final Object item;
+
+        /** The Thread waiting to be signalled; null until waiting. */
+        public volatile Thread waiter;
+
+        /**
+         * Creates node with given item and empty hole.
+         * @param item the item
+         */
+        public Node(Object item) {
+            this.item = item;
+        }
+    }
+
+    /**
+     * A Slot is an AtomicReference with heuristic padding to lessen
+     * cache effects of this heavily CAS'ed location.  While the
+     * padding adds noticeable space, all slots are created only on
+     * demand, and there will be more than one of them only when it
+     * would improve throughput more than enough to outweigh using
+     * extra space.
+     */
+    private static final class Slot extends AtomicReference<Object> {
+        // Improve likelihood of isolation on <= 64 byte cache lines
+        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
+    }
 
-    /** Holder for the item being exchanged */
-    private V item;
-    
     /**
-     * Arrival count transitions from 0 to 1 to 2 then back to 0
-     * during an exchange.
+     * Slot array.  Elements are lazily initialized when needed.
+     * Declared volatile to enable double-checked lazy construction.
      */
-    private int arrivalCount;
+    private volatile Slot[] arena = new Slot[CAPACITY];
+
+    /**
+     * The maximum slot index being used.  The value sometimes
+     * increases when a thread experiences too many CAS contentions,
+     * and sometimes decreases when a spin-wait elapses.  Changes
+     * are performed only via compareAndSet, to avoid stale values
+     * when a thread happens to stall right before setting.
+     */
+    private final AtomicInteger max = new AtomicInteger();
 
     /**
      * Main exchange function, handling the different policy variants.
+     * Uses Object, not "V" as argument and return value to simplify
+     * handling of sentinel values.  Callers from public methods decode
+     * and cast accordingly.
+     *
+     * @param item the (non-null) item to exchange
+     * @param timed true if the wait is timed
+     * @param nanos if timed, the maximum wait time
+     * @return the other thread's item, or CANCEL if interrupted or timed out
      */
-    private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
-        lock.lock();
-        try {
-            V other;
-
-            // If arrival count already at two, we must wait for
-            // a previous pair to finish and reset the count;
-            while (arrivalCount == 2) {
-                if (!timed)
-                    taken.await();
-                else if (nanos > 0) 
-                    nanos = taken.awaitNanos(nanos);
-                else 
-                    throw new TimeoutException();
+    private Object doExchange(Object item, boolean timed, long nanos) {
+        Node me = new Node(item);                 // Create in case occupying
+        int index = hashIndex();                  // Index of current slot
+        int fails = 0;                            // Number of CAS failures
+
+        for (;;) {
+            Object y;                             // Contents of current slot
+            Slot slot = arena[index];
+            if (slot == null)                     // Lazily initialize slots
+                createSlot(index);                // Continue loop to reread
+            else if ((y = slot.get()) != null &&  // Try to fulfill
+                     slot.compareAndSet(y, null)) {
+                Node you = (Node)y;               // Transfer item
+                if (you.compareAndSet(null, item)) {
+                    LockSupport.unpark(you.waiter);
+                    return you.item;
+                }                                 // Else cancelled; continue
+            }
+            else if (y == null &&                 // Try to occupy
+                     slot.compareAndSet(null, me)) {
+                if (index == 0)                   // Blocking wait for slot 0
+                    return timed? awaitNanos(me, slot, nanos): await(me, slot);
+                Object v = spinWait(me, slot);    // Spin wait for non-0
+                if (v != CANCEL)
+                    return v;
+                me = new Node(item);              // Throw away cancelled node
+                int m = max.get();
+                if (m > (index >>>= 1))           // Decrease index
+                    max.compareAndSet(m, m - 1);  // Maybe shrink table
+            }
+            else if (++fails > 1) {               // Allow 2 fails on 1st slot
+                int m = max.get();
+                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
+                    index = m + 1;                // Grow on 3rd failed slot
+                else if (--index < 0)
+                    index = m;                    // Circularly traverse
             }
+        }
+    }
+
+    /**
+     * Returns a hash index for the current thread.  Uses a one-step
+     * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
+     * based on the current thread's Thread.getId().  These hash codes
+     * have more uniform distribution properties with respect to small
+     * moduli (here 1-31) than do other simple hashing functions.
+     *
+     * <p>To return an index between 0 and max, we use a cheap
+     * approximation to a mod operation, that also corrects for bias
+     * due to non-power-of-2 remaindering (see {@link
+     * java.util.Random#nextInt}).  Bits of the hashcode are masked
+     * with "nbits", the ceiling power of two of table size (looked up
+     * in a table packed into three ints).  If too large, this is
+     * retried after rotating the hash by nbits bits, while forcing new
+     * top bit to 0, which guarantees eventual termination (although
+     * with a non-random-bias).  This requires an average of less than
+     * 2 tries for all table sizes, and has a maximum 2% difference
+     * from perfectly uniform slot probabilities when applied to all
+     * possible hash codes for sizes less than 32.
+     *
+     * @return a per-thread-random index, 0 <= index < max
+     */
+    private final int hashIndex() {
+        long id = Thread.currentThread().getId();
+        int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
+
+        int m = max.get();
+        int nbits = (((0xfffffc00  >> m) & 4) | // Compute ceil(log2(m+1))
+                     ((0x000001f8 >>> m) & 2) | // The constants hold
+                     ((0xffff00f2 >>> m) & 1)); // a lookup table
+        int index;
+        while ((index = hash & ((1 << nbits) - 1)) > m)       // May retry on
+            hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
+        return index;
+    }
 
-            int count = ++arrivalCount;
+    /**
+     * Creates a new slot at given index.  Called only when the slot
+     * appears to be null.  Relies on double-check using builtin
+     * locks, since they rarely contend.  This in turn relies on the
+     * arena array being declared volatile.
+     *
+     * @param index the index to add slot at
+     */
+    private void createSlot(int index) {
+        // Create slot outside of lock to narrow sync region
+        Slot newSlot = new Slot();
+        Slot[] a = arena;
+        synchronized (a) {
+            if (a[index] == null)
+                a[index] = newSlot;
+        }
+    }
 
-            // If item is already waiting, replace it and signal other thread
-            if (count == 2) { 
-                other = item;
-                item = x;
-                taken.signal();
-                return other;
-            }
+    /**
+     * Tries to cancel a wait for the given node waiting in the given
+     * slot, if so, helping clear the node from its slot to avoid
+     * garbage retention.
+     *
+     * @param node the waiting node
+     * @param slot the slot it is waiting in
+     * @return true if successfully cancelled
+     */
+    private static boolean tryCancel(Node node, Slot slot) {
+        if (!node.compareAndSet(null, CANCEL))
+            return false;
+        if (slot.get() == node) // pre-check to minimize contention
+            slot.compareAndSet(node, null);
+        return true;
+    }
 
-            // Otherwise, set item and wait for another thread to
-            // replace it and signal us.
+    // Three forms of waiting. Each just different enough not to merge
+    // code with others.
 
-            item = x;
-            InterruptedException interrupted = null;
-            try { 
-                while (arrivalCount != 2) {
-                    if (!timed)
-                        taken.await();
-                    else if (nanos > 0) 
-                        nanos = taken.awaitNanos(nanos);
-                    else 
-                        break; // timed out
-                }
-            } catch (InterruptedException ie) {
-                interrupted = ie;
-            }
+    /**
+     * Spin-waits for hole for a non-0 slot.  Fails if spin elapses
+     * before hole filled.  Does not check interrupt, relying on check
+     * in public exchange method to abort if interrupted on entry.
+     *
+     * @param node the waiting node
+     * @return on success, the hole; on failure, CANCEL
+     */
+    private static Object spinWait(Node node, Slot slot) {
+        int spins = SPINS;
+        for (;;) {
+            Object v = node.get();
+            if (v != null)
+                return v;
+            else if (spins > 0)
+                --spins;
+            else
+                tryCancel(node, slot);
+        }
+    }
+
+    /**
+     * Waits for (by spinning and/or blocking) and gets the hole
+     * filled in by another thread.  Fails if interrupted before
+     * hole filled.
+     *
+     * When a node/thread is about to block, it sets its waiter field
+     * and then rechecks state at least one more time before actually
+     * parking, thus covering race vs fulfiller noticing that waiter
+     * is non-null so should be woken.
+     *
+     * Thread interruption status is checked only surrounding calls to
+     * park.  The caller is assumed to have checked interrupt status
+     * on entry.
+     *
+     * @param node the waiting node
+     * @return on success, the hole; on failure, CANCEL
+     */
+    private static Object await(Node node, Slot slot) {
+        Thread w = Thread.currentThread();
+        int spins = SPINS;
+        for (;;) {
+            Object v = node.get();
+            if (v != null)
+                return v;
+            else if (spins > 0)                 // Spin-wait phase
+                --spins;
+            else if (node.waiter == null)       // Set up to block next
+                node.waiter = w;
+            else if (w.isInterrupted())         // Abort on interrupt
+                tryCancel(node, slot);
+            else                                // Block
+                LockSupport.park();
+        }
+    }
 
-            // Get and reset item and count after the wait.
-            // (We need to do this even if wait was aborted.)
-            other = item;
-            item = null;
-            count = arrivalCount;
-            arrivalCount = 0; 
-            taken.signal();
-            
-            // If the other thread replaced item, then we must
-            // continue even if cancelled.
-            if (count == 2) {
-                if (interrupted != null)
-                    Thread.currentThread().interrupt();
-                return other;
+    /**
+     * Waits for (at index 0) and gets the hole filled in by another
+     * thread.  Fails if timed out or interrupted before hole filled.
+     * Same basic logic as untimed version, but a bit messier.
+     *
+     * @param node the waiting node
+     * @param nanos the wait time
+     * @return on success, the hole; on failure, CANCEL
+     */
+    private Object awaitNanos(Node node, Slot slot, long nanos) {
+        int spins = TIMED_SPINS;
+        long lastTime = 0;
+        Thread w = null;
+        for (;;) {
+            Object v = node.get();
+            if (v != null)
+                return v;
+            long now = System.nanoTime();
+            if (w == null)
+                w = Thread.currentThread();
+            else
+                nanos -= now - lastTime;
+            lastTime = now;
+            if (nanos > 0) {
+                if (spins > 0)
+                    --spins;
+                else if (node.waiter == null)
+                    node.waiter = w;
+                else if (w.isInterrupted())
+                    tryCancel(node, slot);
+                else
+                    LockSupport.parkNanos(nanos);
             }
+            else if (tryCancel(node, slot) && !w.isInterrupted())
+                return scanOnTimeout(node);
+        }
+    }
 
-            // If no one is waiting for us, we can back out
-            if (interrupted != null) 
-                throw interrupted;
-            else  // must be timeout
-                throw new TimeoutException();
-        } finally {
-            lock.unlock();
+    /**
+     * Sweeps through arena checking for any waiting threads.  Called
+     * only upon return from timeout while waiting in slot 0.  When a
+     * thread gives up on a timed wait, it is possible that a
+     * previously-entered thread is still waiting in some other
+     * slot.  So we scan to check for any.  This is almost always
+     * overkill, but decreases the likelihood of timeouts when there
+     * are other threads present to far less than that in lock-based
+     * exchangers in which earlier-arriving threads may still be
+     * waiting on entry locks.
+     *
+     * @param node the waiting node
+     * @return another thread's item, or CANCEL
+     */
+    private Object scanOnTimeout(Node node) {
+        Object y;
+        for (int j = arena.length - 1; j >= 0; --j) {
+            Slot slot = arena[j];
+            if (slot != null) {
+                while ((y = slot.get()) != null) {
+                    if (slot.compareAndSet(y, null)) {
+                        Node you = (Node)y;
+                        if (you.compareAndSet(null, node.item)) {
+                            LockSupport.unpark(you.waiter);
+                            return you.item;
+                        }
+                    }
+                }
+            }
         }
+        return CANCEL;
     }
 
     /**
-     * Create a new Exchanger.
-     **/
+     * Creates a new Exchanger.
+     */
     public Exchanger() {
     }
 
     /**
      * Waits for another thread to arrive at this exchange point (unless
-     * it is {@link Thread#interrupt interrupted}),
+     * the current thread is {@linkplain Thread#interrupt interrupted}),
      * and then transfers the given object to it, receiving its object
      * in return.
+     *
      * <p>If another thread is already waiting at the exchange point then
      * it is resumed for thread scheduling purposes and receives the object
-     * passed in by the current thread. The current thread returns immediately,
+     * passed in by the current thread.  The current thread returns immediately,
      * receiving the object passed to the exchange by that other thread.
-     * <p>If no other thread is already waiting at the exchange then the 
+     *
+     * <p>If no other thread is already waiting at the exchange then the
      * current thread is disabled for thread scheduling purposes and lies
      * dormant until one of two things happens:
      * <ul>
      * <li>Some other thread enters the exchange; or
-     * <li>Some other thread {@link Thread#interrupt interrupts} the current
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current
      * thread.
      * </ul>
      * <p>If the current thread:
      * <ul>
-     * <li>has its interrupted status set on entry to this method; or 
-     * <li>is {@link Thread#interrupt interrupted} while waiting
-     * for the exchange, 
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * for the exchange,
      * </ul>
-     * then {@link InterruptedException} is thrown and the current thread's 
-     * interrupted status is cleared. 
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
      *
      * @param x the object to exchange
-     * @return the object provided by the other thread.
-     * @throws InterruptedException if current thread was interrupted 
-     * while waiting
-     **/
+     * @return the object provided by the other thread
+     * @throws InterruptedException if the current thread was
+     *         interrupted while waiting
+     */
     public V exchange(V x) throws InterruptedException {
-        try {
-            return doExchange(x, false, 0);
-        } catch (TimeoutException cannotHappen) { 
-            throw new Error(cannotHappen);
+        if (!Thread.interrupted()) {
+            Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
+            if (v == NULL_ITEM)
+                return null;
+            if (v != CANCEL)
+                return (V)v;
+            Thread.interrupted(); // Clear interrupt status on IE throw
         }
+        throw new InterruptedException();
     }
 
     /**
      * Waits for another thread to arrive at this exchange point (unless
-     * it is {@link Thread#interrupt interrupted}, or the specified waiting
-     * time elapses),
-     * and then transfers the given object to it, receiving its object
-     * in return.
+     * the current thread is {@linkplain Thread#interrupt interrupted} or
+     * the specified waiting time elapses), and then transfers the given
+     * object to it, receiving its object in return.
      *
      * <p>If another thread is already waiting at the exchange point then
      * it is resumed for thread scheduling purposes and receives the object
-     * passed in by the current thread. The current thread returns immediately,
+     * passed in by the current thread.  The current thread returns immediately,
      * receiving the object passed to the exchange by that other thread.
      *
-     * <p>If no other thread is already waiting at the exchange then the 
+     * <p>If no other thread is already waiting at the exchange then the
      * current thread is disabled for thread scheduling purposes and lies
      * dormant until one of three things happens:
      * <ul>
      * <li>Some other thread enters the exchange; or
-     * <li>Some other thread {@link Thread#interrupt interrupts} the current
-     * thread; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
      * <li>The specified waiting time elapses.
      * </ul>
      * <p>If the current thread:
      * <ul>
-     * <li>has its interrupted status set on entry to this method; or 
-     * <li>is {@link Thread#interrupt interrupted} while waiting
-     * for the exchange, 
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * for the exchange,
      * </ul>
-     * then {@link InterruptedException} is thrown and the current thread's 
-     * interrupted status is cleared. 
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
      *
-     * <p>If the specified waiting time elapses then {@link TimeoutException}
-     * is thrown.
-     * If the time is 
-     * less than or equal to zero, the method will not wait at all.
+     * <p>If the specified waiting time elapses then {@link
+     * TimeoutException} is thrown.  If the time is less than or equal
+     * to zero, the method will not wait at all.
      *
      * @param x the object to exchange
      * @param timeout the maximum time to wait
-     * @param unit the time unit of the <tt>timeout</tt> argument.
-     * @return the object provided by the other thread.
-     * @throws InterruptedException if current thread was interrupted
-     * while waiting
-     * @throws TimeoutException if the specified waiting time elapses before
-     * another thread enters the exchange.
-     **/
-    public V exchange(V x, long timeout, TimeUnit unit) 
+     * @param unit the time unit of the <tt>timeout</tt> argument
+     * @return the object provided by the other thread
+     * @throws InterruptedException if the current thread was
+     *         interrupted while waiting
+     * @throws TimeoutException if the specified waiting time elapses
+     *         before another thread enters the exchange
+     */
+    public V exchange(V x, long timeout, TimeUnit unit)
         throws InterruptedException, TimeoutException {
-        return doExchange(x, true, unit.toNanos(timeout));
+        if (!Thread.interrupted()) {
+            Object v = doExchange(x == null? NULL_ITEM : x,
+                                  true, unit.toNanos(timeout));
+            if (v == NULL_ITEM)
+                return null;
+            if (v != CANCEL)
+                return (V)v;
+            if (!Thread.interrupted())
+                throw new TimeoutException();
+        }
+        throw new InterruptedException();
     }
-
 }
-
-

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java Tue Jul 28 09:30:33 2009
@@ -19,14 +19,14 @@
     private static final long serialVersionUID = 7830266012832686185L;
 
     /**
-     * Constructs a <tt>ExecutionException</tt> with no detail message.
+     * Constructs an <tt>ExecutionException</tt> with no detail message.
      * The cause is not initialized, and may subsequently be
      * initialized by a call to {@link #initCause(Throwable) initCause}.
      */
     protected ExecutionException() { }
 
     /**
-     * Constructs a <tt>ExecutionException</tt> with the specified detail
+     * Constructs an <tt>ExecutionException</tt> with the specified detail
      * message. The cause is not initialized, and may subsequently be
      * initialized by a call to {@link #initCause(Throwable) initCause}.
      *
@@ -37,7 +37,7 @@
     }
 
     /**
-     * Constructs a <tt>ExecutionException</tt> with the specified detail
+     * Constructs an <tt>ExecutionException</tt> with the specified detail
      * message and cause.
      *
      * @param  message the detail message
@@ -49,7 +49,7 @@
     }
 
     /**
-     * Constructs a <tt>ExecutionException</tt> with the specified cause.
+     * Constructs an <tt>ExecutionException</tt> with the specified cause.
      * The detail message is set to:
      * <pre>
      *  (cause == null ? null : cause.toString())</pre>

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java Tue Jul 28 09:30:33 2009
@@ -21,7 +21,7 @@
  * executor.execute(new RunnableTask2());
  * ...
  * </pre>
- * 
+ *
  * However, the <tt>Executor</tt> interface does not strictly
  * require that execution be asynchronous. In the simplest case, an
  * executor can run the submitted task immediately in the caller's
@@ -52,7 +52,7 @@
  *
  * <pre>
  * class SerialExecutor implements Executor {
- *     final Queue&lt;Runnable&gt; tasks = new LinkedBlockingQueue&lt;Runnable&gt;();
+ *     final Queue&lt;Runnable&gt; tasks = new ArrayDeque&lt;Runnable&gt;();
  *     final Executor executor;
  *     Runnable active;
  *
@@ -88,6 +88,11 @@
  * extensible thread pool implementation. The {@link Executors} class
  * provides convenient factory methods for these Executors.
  *
+ * <p>Memory consistency effects: Actions in a thread prior to
+ * submitting a {@code Runnable} object to an {@code Executor}
+ * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
+ * its execution begins, perhaps in another thread.
+ *
  * @since 1.5
  * @author Doug Lea
  */

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java Tue Jul 28 09:30:33 2009
@@ -6,11 +6,10 @@
 
 package java.util.concurrent;
 
-
 /**
  * A {@link CompletionService} that uses a supplied {@link Executor}
  * to execute tasks.  This class arranges that submitted tasks are,
- * upon completion, placed on a queue accessible using <tt>take</tt>.
+ * upon completion, placed on a queue accessible using {@code take}.
  * The class is lightweight enough to be suitable for transient use
  * when processing groups of tasks.
  *
@@ -19,84 +18,102 @@
  * <b>Usage Examples.</b>
  *
  * Suppose you have a set of solvers for a certain problem, each
- * returning a value of some type <tt>Result</tt>, and would like to
+ * returning a value of some type {@code Result}, and would like to
  * run them concurrently, processing the results of each of them that
- * return a non-null value, in some method <tt>use(Result r)</tt>. You
+ * return a non-null value, in some method {@code use(Result r)}. You
  * could write this as:
  *
- * <pre>
- *    void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
- *      throws InterruptedException, ExecutionException {
- *        CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
- *        for (Callable&lt;Result&gt; s : solvers)
- *            ecs.submit(s);
- *        int n = solvers.size();
- *        for (int i = 0; i &lt; n; ++i) {
- *            Result r = ecs.take().get();
- *            if (r != null) 
- *                use(r);
- *        }
- *    }
- * </pre>
+ * <pre> {@code
+ * void solve(Executor e,
+ *            Collection<Callable<Result>> solvers)
+ *     throws InterruptedException, ExecutionException {
+ *     CompletionService<Result> ecs
+ *         = new ExecutorCompletionService<Result>(e);
+ *     for (Callable<Result> s : solvers)
+ *         ecs.submit(s);
+ *     int n = solvers.size();
+ *     for (int i = 0; i < n; ++i) {
+ *         Result r = ecs.take().get();
+ *         if (r != null)
+ *             use(r);
+ *     }
+ * }}</pre>
  *
  * Suppose instead that you would like to use the first non-null result
  * of the set of tasks, ignoring any that encounter exceptions,
  * and cancelling all other tasks when the first one is ready:
  *
- * <pre>
- *    void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers) 
- *      throws InterruptedException {
- *        CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
- *        int n = solvers.size();
- *        List&lt;Future&lt;Result&gt;&gt; futures = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
- *        Result result = null;
- *        try {
- *            for (Callable&lt;Result&gt; s : solvers)
- *                futures.add(ecs.submit(s));
- *            for (int i = 0; i &lt; n; ++i) {
- *                try {
- *                    Result r = ecs.take().get();
- *                    if (r != null) {
- *                        result = r;
- *                        break;
- *                    }
- *                } catch(ExecutionException ignore) {}
- *            }
- *        }
- *        finally {
- *            for (Future&lt;Result&gt; f : futures)
- *                f.cancel(true);
- *        }
- *
- *        if (result != null)
- *            use(result);
- *    }
- * </pre>
+ * <pre> {@code
+ * void solve(Executor e,
+ *            Collection<Callable<Result>> solvers)
+ *     throws InterruptedException {
+ *     CompletionService<Result> ecs
+ *         = new ExecutorCompletionService<Result>(e);
+ *     int n = solvers.size();
+ *     List<Future<Result>> futures
+ *         = new ArrayList<Future<Result>>(n);
+ *     Result result = null;
+ *     try {
+ *         for (Callable<Result> s : solvers)
+ *             futures.add(ecs.submit(s));
+ *         for (int i = 0; i < n; ++i) {
+ *             try {
+ *                 Result r = ecs.take().get();
+ *                 if (r != null) {
+ *                     result = r;
+ *                     break;
+ *                 }
+ *             } catch (ExecutionException ignore) {}
+ *         }
+ *     }
+ *     finally {
+ *         for (Future<Result> f : futures)
+ *             f.cancel(true);
+ *     }
+ *
+ *     if (result != null)
+ *         use(result);
+ * }}</pre>
  */
 public class ExecutorCompletionService<V> implements CompletionService<V> {
     private final Executor executor;
+    private final AbstractExecutorService aes;
     private final BlockingQueue<Future<V>> completionQueue;
 
     /**
      * FutureTask extension to enqueue upon completion
      */
-    private class QueueingFuture extends FutureTask<V> {
-        QueueingFuture(Callable<V> c) { super(c); }
-        QueueingFuture(Runnable t, V r) { super(t, r); }
-        protected void done() { completionQueue.add(this); }
+    private class QueueingFuture extends FutureTask<Void> {
+        QueueingFuture(FutureTask<V> task) {
+            super(task, null);
+            this.task = task;
+        }
+        protected void done() { completionQueue.add(task); }
+        private final Future<V> task;
+    }
+
+    private FutureTask<V> newTaskFor(Callable<V> task) {
+        return new FutureTask<V>(task);
+    }
+
+    private FutureTask<V> newTaskFor(Runnable task, V result) {
+        return new FutureTask<V>(task, result);
     }
 
     /**
      * Creates an ExecutorCompletionService using the supplied
      * executor for base task execution and a
      * {@link LinkedBlockingQueue} as a completion queue.
+     *
      * @param executor the executor to use
-     * @throws NullPointerException if executor is <tt>null</tt>
+     * @throws NullPointerException if executor is {@code null}
      */
     public ExecutorCompletionService(Executor executor) {
-        if (executor == null) 
+        if (executor == null)
             throw new NullPointerException();
         this.executor = executor;
+        this.aes = (executor instanceof AbstractExecutorService) ?
+            (AbstractExecutorService) executor : null;
         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
     }
 
@@ -104,30 +121,36 @@
      * Creates an ExecutorCompletionService using the supplied
      * executor for base task execution and the supplied queue as its
      * completion queue.
+     *
      * @param executor the executor to use
      * @param completionQueue the queue to use as the completion queue
-     * normally one dedicated for use by this service
-     * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
+     *        normally one dedicated for use by this service. This
+     *        queue is treated as unbounded -- failed attempted
+     *        {@code Queue.add} operations for completed taskes cause
+     *        them not to be retrievable.
+     * @throws NullPointerException if executor or completionQueue are {@code null}
      */
     public ExecutorCompletionService(Executor executor,
                                      BlockingQueue<Future<V>> completionQueue) {
-        if (executor == null || completionQueue == null) 
+        if (executor == null || completionQueue == null)
             throw new NullPointerException();
         this.executor = executor;
+        this.aes = (executor instanceof AbstractExecutorService) ?
+            (AbstractExecutorService) executor : null;
         this.completionQueue = completionQueue;
     }
 
     public Future<V> submit(Callable<V> task) {
         if (task == null) throw new NullPointerException();
-        QueueingFuture f = new QueueingFuture(task);
-        executor.execute(f);
+        FutureTask<V> f = newTaskFor(task);
+        executor.execute(new QueueingFuture(f));
         return f;
     }
 
     public Future<V> submit(Runnable task, V result) {
         if (task == null) throw new NullPointerException();
-        QueueingFuture f = new QueueingFuture(task, result);
-        executor.execute(f);
+        FutureTask<V> f = newTaskFor(task, result);
+        executor.execute(new QueueingFuture(f));
         return f;
     }
 
@@ -144,5 +167,3 @@
     }
 
 }
-
-

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java Tue Jul 28 09:30:33 2009
@@ -5,7 +5,6 @@
  */
 
 package java.util.concurrent;
-
 import java.util.List;
 import java.util.Collection;
 import java.security.PrivilegedAction;
@@ -14,14 +13,18 @@
 /**
  * An {@link Executor} that provides methods to manage termination and
  * methods that can produce a {@link Future} for tracking progress of
- * one or more asynchronous tasks.  
+ * one or more asynchronous tasks.
  *
- * <p>
- * An <tt>ExecutorService</tt> can be shut down, which will cause it
- * to stop accepting new tasks.  After being shut down, the executor
- * will eventually terminate, at which point no tasks are actively
- * executing, no tasks are awaiting execution, and no new tasks can be
- * submitted.
+ * <p> An <tt>ExecutorService</tt> can be shut down, which will cause
+ * it to reject new tasks.  Two different methods are provided for
+ * shutting down an <tt>ExecutorService</tt>. The {@link #shutdown}
+ * method will allow previously submitted tasks to execute before
+ * terminating, while the {@link #shutdownNow} method prevents waiting
+ * tasks from starting and attempts to stop currently executing tasks.
+ * Upon termination, an executor has no tasks actively executing, no
+ * tasks awaiting execution, and no new tasks can be submitted.  An
+ * unused <tt>ExecutorService</tt> should be shut down to allow
+ * reclamation of its resources.
  *
  * <p> Method <tt>submit</tt> extends base method {@link
  * Executor#execute} by creating and returning a {@link Future} that
@@ -35,41 +38,74 @@
  * <p>The {@link Executors} class provides factory methods for the
  * executor services provided in this package.
  *
- * <h3>Usage Example</h3>
+ * <h3>Usage Examples</h3>
  *
  * Here is a sketch of a network service in which threads in a thread
  * pool service incoming requests. It uses the preconfigured {@link
  * Executors#newFixedThreadPool} factory method:
  *
  * <pre>
- * class NetworkService {
- *    private final ServerSocket serverSocket;
- *    private final ExecutorService pool;
- *
- *    public NetworkService(int port, int poolSize) throws IOException {
- *      serverSocket = new ServerSocket(port);
- *      pool = Executors.newFixedThreadPool(poolSize);
- *    }
- * 
- *    public void serve() {
- *      try {
- *        for (;;) {
- *          pool.execute(new Handler(serverSocket.accept()));
- *        }
- *      } catch (IOException ex) {
- *        pool.shutdown();
- *      }
- *    }
- *  }
- *
- *  class Handler implements Runnable {
- *    private final Socket socket;
- *    Handler(Socket socket) { this.socket = socket; }
- *    public void run() {
- *      // read and service request
- *    }
+ * class NetworkService implements Runnable {
+ *   private final ServerSocket serverSocket;
+ *   private final ExecutorService pool;
+ *
+ *   public NetworkService(int port, int poolSize)
+ *       throws IOException {
+ *     serverSocket = new ServerSocket(port);
+ *     pool = Executors.newFixedThreadPool(poolSize);
+ *   }
+ *
+ *   public void run() { // run the service
+ *     try {
+ *       for (;;) {
+ *         pool.execute(new Handler(serverSocket.accept()));
+ *       }
+ *     } catch (IOException ex) {
+ *       pool.shutdown();
+ *     }
+ *   }
+ * }
+ *
+ * class Handler implements Runnable {
+ *   private final Socket socket;
+ *   Handler(Socket socket) { this.socket = socket; }
+ *   public void run() {
+ *     // read and service request on socket
+ *   }
  * }
  * </pre>
+ *
+ * The following method shuts down an <tt>ExecutorService</tt> in two phases,
+ * first by calling <tt>shutdown</tt> to reject incoming tasks, and then
+ * calling <tt>shutdownNow</tt>, if necessary, to cancel any lingering tasks:
+ *
+ * <pre>
+ * void shutdownAndAwaitTermination(ExecutorService pool) {
+ *   pool.shutdown(); // Disable new tasks from being submitted
+ *   try {
+ *     // Wait a while for existing tasks to terminate
+ *     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
+ *       pool.shutdownNow(); // Cancel currently executing tasks
+ *       // Wait a while for tasks to respond to being cancelled
+ *       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
+ *           System.err.println("Pool did not terminate");
+ *     }
+ *   } catch (InterruptedException ie) {
+ *     // (Re-)Cancel if current thread also interrupted
+ *     pool.shutdownNow();
+ *     // Preserve interrupt status
+ *     Thread.currentThread().interrupt();
+ *   }
+ * }
+ * </pre>
+ *
+ * <p>Memory consistency effects: Actions in a thread prior to the
+ * submission of a {@code Runnable} or {@code Callable} task to an
+ * {@code ExecutorService}
+ * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
+ * any actions taken by that task, which in turn <i>happen-before</i> the
+ * result is retrieved via {@code Future.get()}.
+ *
  * @since 1.5
  * @author Doug Lea
  */
@@ -77,33 +113,45 @@
 
     /**
      * Initiates an orderly shutdown in which previously submitted
-     * tasks are executed, but no new tasks will be
-     * accepted. Invocation has no additional effect if already shut
-     * down.
+     * tasks are executed, but no new tasks will be accepted.
+     * Invocation has no additional effect if already shut down.
+     *
+     * <p>This method does not wait for previously submitted tasks to
+     * complete execution.  Use {@link #awaitTermination awaitTermination}
+     * to do that.
+     *
      * @throws SecurityException if a security manager exists and
-     * shutting down this ExecutorService may manipulate threads that
-     * the caller is not permitted to modify because it does not hold
-     * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
-     * or the security manager's <tt>checkAccess</tt>  method denies access.
+     *         shutting down this ExecutorService may manipulate
+     *         threads that the caller is not permitted to modify
+     *         because it does not hold {@link
+     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+     *         or the security manager's <tt>checkAccess</tt> method
+     *         denies access.
      */
     void shutdown();
 
     /**
      * Attempts to stop all actively executing tasks, halts the
-     * processing of waiting tasks, and returns a list of the tasks that were
-     * awaiting execution. 
-     *  
+     * processing of waiting tasks, and returns a list of the tasks
+     * that were awaiting execution.
+     *
+     * <p>This method does not wait for actively executing tasks to
+     * terminate.  Use {@link #awaitTermination awaitTermination} to
+     * do that.
+     *
      * <p>There are no guarantees beyond best-effort attempts to stop
      * processing actively executing tasks.  For example, typical
-     * implementations will cancel via {@link Thread#interrupt}, so if any
-     * tasks mask or fail to respond to interrupts, they may never terminate.
+     * implementations will cancel via {@link Thread#interrupt}, so any
+     * task that fails to respond to interrupts may never terminate.
      *
      * @return list of tasks that never commenced execution
      * @throws SecurityException if a security manager exists and
-     * shutting down this ExecutorService may manipulate threads that
-     * the caller is not permitted to modify because it does not hold
-     * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
-     * or the security manager's <tt>checkAccess</tt> method denies access.
+     *         shutting down this ExecutorService may manipulate
+     *         threads that the caller is not permitted to modify
+     *         because it does not hold {@link
+     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+     *         or the security manager's <tt>checkAccess</tt> method
+     *         denies access.
      */
     List<Runnable> shutdownNow();
 
@@ -130,8 +178,8 @@
      *
      * @param timeout the maximum time to wait
      * @param unit the time unit of the timeout argument
-     * @return <tt>true</tt> if this executor terminated and <tt>false</tt>
-     * if the timeout elapsed before termination
+     * @return <tt>true</tt> if this executor terminated and
+     *         <tt>false</tt> if the timeout elapsed before termination
      * @throws InterruptedException if interrupted while waiting
      */
     boolean awaitTermination(long timeout, TimeUnit unit)
@@ -139,8 +187,10 @@
 
 
     /**
-     * Submits a value-returning task for execution and returns a Future
-     * representing the pending results of the task. 
+     * Submits a value-returning task for execution and returns a
+     * Future representing the pending results of the task. The
+     * Future's <tt>get</tt> method will return the task's result upon
+     * successful completion.
      *
      * <p>
      * If you would like to immediately block waiting
@@ -154,88 +204,92 @@
      *
      * @param task the task to submit
      * @return a Future representing pending completion of the task
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution
-     * @throws NullPointerException if task null
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
+     * @throws NullPointerException if the task is null
      */
     <T> Future<T> submit(Callable<T> task);
 
     /**
-     * Submits a Runnable task for execution and returns a Future 
-     * representing that task that will upon completion return 
-     * the given result
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's <tt>get</tt> method will
+     * return the given result upon successful completion.
      *
      * @param task the task to submit
      * @param result the result to return
-     * @return a Future representing pending completion of the task,
-     * and whose <tt>get()</tt> method will return the given result
-     * upon completion.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution
-     * @throws NullPointerException if task null     
+     * @return a Future representing pending completion of the task
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
+     * @throws NullPointerException if the task is null
      */
     <T> Future<T> submit(Runnable task, T result);
 
     /**
-     * Submits a Runnable task for execution and returns a Future 
-     * representing that task.
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's <tt>get</tt> method will
+     * return <tt>null</tt> upon <em>successful</em> completion.
      *
      * @param task the task to submit
-     * @return a Future representing pending completion of the task,
-     * and whose <tt>get()</tt> method will return <tt>null</tt>
-     * upon completion.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution
-     * @throws NullPointerException if task null
+     * @return a Future representing pending completion of the task
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
+     * @throws NullPointerException if the task is null
      */
     Future<?> submit(Runnable task);
 
     /**
-     * Executes the given tasks, returning their results
-     * when all complete.
+     * Executes the given tasks, returning a list of Futures holding
+     * their status and results when all complete.
+     * {@link Future#isDone} is <tt>true</tt> for each
+     * element of the returned list.
      * Note that a <em>completed</em> task could have
      * terminated either normally or by throwing an exception.
      * The results of this method are undefined if the given
      * collection is modified while this operation is in progress.
+     *
      * @param tasks the collection of tasks
      * @return A list of Futures representing the tasks, in the same
-     * sequential order as produced by the iterator for the given task
-     * list, each of which has completed.
+     *         sequential order as produced by the iterator for the
+     *         given task list, each of which has completed.
      * @throws InterruptedException if interrupted while waiting, in
-     * which case unfinished tasks are cancelled.
+     *         which case unfinished tasks are cancelled.
      * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
-     * @throws RejectedExecutionException if any task cannot be scheduled
-     * for execution
+     * @throws RejectedExecutionException if any task cannot be
+     *         scheduled for execution
      */
 
     <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
         throws InterruptedException;
 
     /**
-     * Executes the given tasks, returning their results
+     * Executes the given tasks, returning a list of Futures holding
+     * their status and results
      * when all complete or the timeout expires, whichever happens first.
+     * {@link Future#isDone} is <tt>true</tt> for each
+     * element of the returned list.
      * Upon return, tasks that have not completed are cancelled.
      * Note that a <em>completed</em> task could have
      * terminated either normally or by throwing an exception.
      * The results of this method are undefined if the given
      * collection is modified while this operation is in progress.
+     *
      * @param tasks the collection of tasks
      * @param timeout the maximum time to wait
      * @param unit the time unit of the timeout argument
-     * @return A list of Futures representing the tasks, in the same
-     * sequential order as produced by the iterator for the given
-     * task list. If the operation did not time out, each task will
-     * have completed. If it did time out, some of thiese tasks will
-     * not have completed.
+     * @return a list of Futures representing the tasks, in the same
+     *         sequential order as produced by the iterator for the
+     *         given task list. If the operation did not time out,
+     *         each task will have completed. If it did time out, some
+     *         of these tasks will not have completed.
      * @throws InterruptedException if interrupted while waiting, in
-     * which case unfinished tasks are cancelled.
+     *         which case unfinished tasks are cancelled
      * @throws NullPointerException if tasks, any of its elements, or
-     * unit are <tt>null</tt>
+     *         unit are <tt>null</tt>
      * @throws RejectedExecutionException if any task cannot be scheduled
-     * for execution
+     *         for execution
      */
-    <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, 
-                                  long timeout, TimeUnit unit) 
+    <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
+                                  long timeout, TimeUnit unit)
         throws InterruptedException;
 
     /**
@@ -245,15 +299,16 @@
      * tasks that have not completed are cancelled.
      * The results of this method are undefined if the given
      * collection is modified while this operation is in progress.
+     *
      * @param tasks the collection of tasks
-     * @return The result returned by one of the tasks.
+     * @return the result returned by one of the tasks
      * @throws InterruptedException if interrupted while waiting
      * @throws NullPointerException if tasks or any of its elements
-     * are <tt>null</tt>
-     * @throws IllegalArgumentException if tasks empty
+     *         are <tt>null</tt>
+     * @throws IllegalArgumentException if tasks is empty
      * @throws ExecutionException if no task successfully completes
      * @throws RejectedExecutionException if tasks cannot be scheduled
-     * for execution
+     *         for execution
      */
     <T> T invokeAny(Collection<Callable<T>> tasks)
         throws InterruptedException, ExecutionException;
@@ -266,21 +321,21 @@
      * completed are cancelled.
      * The results of this method are undefined if the given
      * collection is modified while this operation is in progress.
+     *
      * @param tasks the collection of tasks
      * @param timeout the maximum time to wait
      * @param unit the time unit of the timeout argument
-     * @return The result returned by one of the tasks.
+     * @return the result returned by one of the tasks.
      * @throws InterruptedException if interrupted while waiting
      * @throws NullPointerException if tasks, any of its elements, or
-     * unit are <tt>null</tt>
+     *         unit are <tt>null</tt>
      * @throws TimeoutException if the given timeout elapses before
-     * any task successfully completes
+     *         any task successfully completes
      * @throws ExecutionException if no task successfully completes
      * @throws RejectedExecutionException if tasks cannot be scheduled
-     * for execution
+     *         for execution
      */
-    <T> T invokeAny(Collection<Callable<T>> tasks, 
-                    long timeout, TimeUnit unit) 
+    <T> T invokeAny(Collection<Callable<T>> tasks,
+                    long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException;
-
 }



Mime
View raw message