Return-Path: Delivered-To: apmail-harmony-commits-archive@www.apache.org Received: (qmail 55500 invoked from network); 28 Jul 2009 09:30:44 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 28 Jul 2009 09:30:44 -0000 Received: (qmail 63080 invoked by uid 500); 28 Jul 2009 09:32:01 -0000 Delivered-To: apmail-harmony-commits-archive@harmony.apache.org Received: (qmail 63034 invoked by uid 500); 28 Jul 2009 09:32:01 -0000 Mailing-List: contact commits-help@harmony.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@harmony.apache.org Delivered-To: mailing list commits@harmony.apache.org Received: (qmail 63025 invoked by uid 99); 28 Jul 2009 09:32:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jul 2009 09:32:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jul 2009 09:31:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 875EC23889CF; Tue, 28 Jul 2009 09:30:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r798469 [9/28] - in /harmony/enhanced/classlib/branches/java6: ./ depends/build/platform/ depends/files/ depends/jars/ depends/manifests/icu4j_4.0/ depends/manifests/icu4j_4.2.1/ depends/manifests/icu4j_4.2.1/META-INF/ make/ modules/accessi... Date: Tue, 28 Jul 2009 09:30:48 -0000 To: commits@harmony.apache.org From: hindessm@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090728093059.875EC23889CF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java?rev=798469&r1=798468&r2=798469&view=diff ============================================================================== --- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java (original) +++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Exchanger.java Tue Jul 28 09:30:33 2009 @@ -1,27 +1,31 @@ /* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at + * Written by Doug Lea, Bill Scherer, and Michael Scott with + * assistance from members of JCP JSR-166 Expert Group and released to + * the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ package java.util.concurrent; -import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.LockSupport; /** - * A synchronization point at which two threads can exchange objects. - * Each thread presents some object on entry to the {@link #exchange - * exchange} method, and receives the object presented by the other - * thread on return. + * A synchronization point at which threads can pair and swap elements + * within pairs. Each thread presents some object on entry to the + * {@link #exchange exchange} method, matches with a partner thread, + * and receives its partner's object on return. An Exchanger may be + * viewed as a bidirectional form of a {@link SynchronousQueue}. + * Exchangers may be useful in applications such as genetic algorithms + * and pipeline designs. * *

Sample Usage: - * Here are the highlights of a class that uses an Exchanger to - * swap buffers between threads so that the thread filling the - * buffer gets a freshly - * emptied one when it needs it, handing off the filled one to - * the thread emptying the buffer. - *

+ * Here are the highlights of a class that uses an {@code Exchanger}
+ * to swap buffers between threads so that the thread filling the
+ * buffer gets a freshly emptied one when it needs it, handing off the
+ * filled one to the thread emptying the buffer.
+ * 
{@code
  * class FillAndEmpty {
- *   Exchanger<DataBuffer> exchanger = new Exchanger();
+ *   Exchanger exchanger = new Exchanger();
  *   DataBuffer initialEmptyBuffer = ... a made-up type
  *   DataBuffer initialFullBuffer = ...
  *
@@ -31,7 +35,7 @@
  *       try {
  *         while (currentBuffer != null) {
  *           addToBuffer(currentBuffer);
- *           if (currentBuffer.full())
+ *           if (currentBuffer.isFull())
  *             currentBuffer = exchanger.exchange(currentBuffer);
  *         }
  *       } catch (InterruptedException ex) { ... handle ... }
@@ -44,7 +48,7 @@
  *       try {
  *         while (currentBuffer != null) {
  *           takeFromBuffer(currentBuffer);
- *           if (currentBuffer.empty())
+ *           if (currentBuffer.isEmpty())
  *             currentBuffer = exchanger.exchange(currentBuffer);
  *         }
  *       } catch (InterruptedException ex) { ... handle ...}
@@ -56,192 +60,597 @@
  *     new Thread(new EmptyingLoop()).start();
  *   }
  * }
- * 
+ * }
+ * + *

Memory consistency effects: For each pair of threads that + * successfully exchange objects via an {@code Exchanger}, actions + * prior to the {@code exchange()} in each thread + * happen-before + * those subsequent to a return from the corresponding {@code exchange()} + * in the other thread. * * @since 1.5 - * @author Doug Lea + * @author Doug Lea and Bill Scherer and Michael Scott * @param The type of objects that may be exchanged */ public class Exchanger { - private final ReentrantLock lock = new ReentrantLock(); - private final Condition taken = lock.newCondition(); + /* + * Algorithm Description: + * + * The basic idea is to maintain a "slot", which is a reference to + * a Node containing both an Item to offer and a "hole" waiting to + * get filled in. If an incoming "occupying" thread sees that the + * slot is null, it CAS'es (compareAndSets) a Node there and waits + * for another to invoke exchange. That second "fulfilling" thread + * sees that the slot is non-null, and so CASes it back to null, + * also exchanging items by CASing the hole, plus waking up the + * occupying thread if it is blocked. In each case CAS'es may + * fail because a slot at first appears non-null but is null upon + * CAS, or vice-versa. So threads may need to retry these + * actions. + * + * This simple approach works great when there are only a few + * threads using an Exchanger, but performance rapidly + * deteriorates due to CAS contention on the single slot when + * there are lots of threads using an exchanger. So instead we use + * an "arena"; basically a kind of hash table with a dynamically + * varying number of slots, any one of which can be used by + * threads performing an exchange. Incoming threads pick slots + * based on a hash of their Thread ids. If an incoming thread + * fails to CAS in its chosen slot, it picks an alternative slot + * instead. And similarly from there. If a thread successfully + * CASes into a slot but no other thread arrives, it tries + * another, heading toward the zero slot, which always exists even + * if the table shrinks. The particular mechanics controlling this + * are as follows: + * + * Waiting: Slot zero is special in that it is the only slot that + * exists when there is no contention. A thread occupying slot + * zero will block if no thread fulfills it after a short spin. + * In other cases, occupying threads eventually give up and try + * another slot. Waiting threads spin for a while (a period that + * should be a little less than a typical context-switch time) + * before either blocking (if slot zero) or giving up (if other + * slots) and restarting. There is no reason for threads to block + * unless there are unlikely to be any other threads present. + * Occupants are mainly avoiding memory contention so sit there + * quietly polling for a shorter period than it would take to + * block and then unblock them. Non-slot-zero waits that elapse + * because of lack of other threads waste around one extra + * context-switch time per try, which is still on average much + * faster than alternative approaches. + * + * Sizing: Usually, using only a few slots suffices to reduce + * contention. Especially with small numbers of threads, using + * too many slots can lead to just as poor performance as using + * too few of them, and there's not much room for error. The + * variable "max" maintains the number of slots actually in + * use. It is increased when a thread sees too many CAS + * failures. (This is analogous to resizing a regular hash table + * based on a target load factor, except here, growth steps are + * just one-by-one rather than proportional.) Growth requires + * contention failures in each of three tried slots. Requiring + * multiple failures for expansion copes with the fact that some + * failed CASes are not due to contention but instead to simple + * races between two threads or thread pre-emptions occurring + * between reading and CASing. Also, very transient peak + * contention can be much higher than the average sustainable + * levels. The max limit is decreased on average 50% of the times + * that a non-slot-zero wait elapses without being fulfilled. + * Threads experiencing elapsed waits move closer to zero, so + * eventually find existing (or future) threads even if the table + * has been shrunk due to inactivity. The chosen mechanics and + * thresholds for growing and shrinking are intrinsically + * entangled with indexing and hashing inside the exchange code, + * and can't be nicely abstracted out. + * + * Hashing: Each thread picks its initial slot to use in accord + * with a simple hashcode. The sequence is the same on each + * encounter by any given thread, but effectively random across + * threads. Using arenas encounters the classic cost vs quality + * tradeoffs of all hash tables. Here, we use a one-step FNV-1a + * hash code based on the current thread's Thread.getId(), along + * with a cheap approximation to a mod operation to select an + * index. The downside of optimizing index selection in this way + * is that the code is hardwired to use a maximum table size of + * 32. But this value more than suffices for known platforms and + * applications. + * + * Probing: On sensed contention of a selected slot, we probe + * sequentially through the table, analogously to linear probing + * after collision in a hash table. (We move circularly, in + * reverse order, to mesh best with table growth and shrinkage + * rules.) Except that to minimize the effects of false-alarms + * and cache thrashing, we try the first selected slot twice + * before moving. + * + * Padding: Even with contention management, slots are heavily + * contended, so use cache-padding to avoid poor memory + * performance. Because of this, slots are lazily constructed + * only when used, to avoid wasting this space unnecessarily. + * While isolation of locations is not much of an issue at first + * in an application, as time goes on and garbage-collectors + * perform compaction, slots are very likely to be moved adjacent + * to each other, which can cause much thrashing of cache lines on + * MPs unless padding is employed. + * + * This is an improvement of the algorithm described in the paper + * "A Scalable Elimination-based Exchange Channel" by William + * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05 + * workshop. Available at: http://hdl.handle.net/1802/2104 + */ + + /** The number of CPUs, for sizing and spin control */ + private static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** + * The capacity of the arena. Set to a value that provides more + * than enough space to handle contention. On small machines + * most slots won't be used, but it is still not wasted because + * the extra space provides some machine-level address padding + * to minimize interference with heavily CAS'ed Slot locations. + * And on very large machines, performance eventually becomes + * bounded by memory bandwidth, not numbers of threads/CPUs. + * This constant cannot be changed without also modifying + * indexing and hashing algorithms. + */ + private static final int CAPACITY = 32; + + /** + * The value of "max" that will hold all threads without + * contention. When this value is less than CAPACITY, some + * otherwise wasted expansion can be avoided. + */ + private static final int FULL = + Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1); + + /** + * The number of times to spin (doing nothing except polling a + * memory location) before blocking or giving up while waiting to + * be fulfilled. Should be zero on uniprocessors. On + * multiprocessors, this value should be large enough so that two + * threads exchanging items as fast as possible block only when + * one of them is stalled (due to GC or preemption), but not much + * longer, to avoid wasting CPU resources. Seen differently, this + * value is a little over half the number of cycles of an average + * context switch time on most systems. The value here is + * approximately the average of those across a range of tested + * systems. + */ + private static final int SPINS = (NCPU == 1) ? 0 : 2000; + + /** + * The number of times to spin before blocking in timed waits. + * Timed waits spin more slowly because checking the time takes + * time. The best value relies mainly on the relative rate of + * System.nanoTime vs memory accesses. The value is empirically + * derived to work well across a variety of systems. + */ + private static final int TIMED_SPINS = SPINS / 20; + + /** + * Sentinel item representing cancellation of a wait due to + * interruption, timeout, or elapsed spin-waits. This value is + * placed in holes on cancellation, and used as a return value + * from waiting methods to indicate failure to set or get hole. + */ + private static final Object CANCEL = new Object(); + + /** + * Value representing null arguments/returns from public + * methods. This disambiguates from internal requirement that + * holes start out as null to mean they are not yet set. + */ + private static final Object NULL_ITEM = new Object(); + + /** + * Nodes hold partially exchanged data. This class + * opportunistically subclasses AtomicReference to represent the + * hole. So get() returns hole, and compareAndSet CAS'es value + * into hole. This class cannot be parameterized as "V" because + * of the use of non-V CANCEL sentinels. + */ + private static final class Node extends AtomicReference { + /** The element offered by the Thread creating this node. */ + public final Object item; + + /** The Thread waiting to be signalled; null until waiting. */ + public volatile Thread waiter; + + /** + * Creates node with given item and empty hole. + * @param item the item + */ + public Node(Object item) { + this.item = item; + } + } + + /** + * A Slot is an AtomicReference with heuristic padding to lessen + * cache effects of this heavily CAS'ed location. While the + * padding adds noticeable space, all slots are created only on + * demand, and there will be more than one of them only when it + * would improve throughput more than enough to outweigh using + * extra space. + */ + private static final class Slot extends AtomicReference { + // Improve likelihood of isolation on <= 64 byte cache lines + long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; + } - /** Holder for the item being exchanged */ - private V item; - /** - * Arrival count transitions from 0 to 1 to 2 then back to 0 - * during an exchange. + * Slot array. Elements are lazily initialized when needed. + * Declared volatile to enable double-checked lazy construction. */ - private int arrivalCount; + private volatile Slot[] arena = new Slot[CAPACITY]; + + /** + * The maximum slot index being used. The value sometimes + * increases when a thread experiences too many CAS contentions, + * and sometimes decreases when a spin-wait elapses. Changes + * are performed only via compareAndSet, to avoid stale values + * when a thread happens to stall right before setting. + */ + private final AtomicInteger max = new AtomicInteger(); /** * Main exchange function, handling the different policy variants. + * Uses Object, not "V" as argument and return value to simplify + * handling of sentinel values. Callers from public methods decode + * and cast accordingly. + * + * @param item the (non-null) item to exchange + * @param timed true if the wait is timed + * @param nanos if timed, the maximum wait time + * @return the other thread's item, or CANCEL if interrupted or timed out */ - private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException { - lock.lock(); - try { - V other; - - // If arrival count already at two, we must wait for - // a previous pair to finish and reset the count; - while (arrivalCount == 2) { - if (!timed) - taken.await(); - else if (nanos > 0) - nanos = taken.awaitNanos(nanos); - else - throw new TimeoutException(); + private Object doExchange(Object item, boolean timed, long nanos) { + Node me = new Node(item); // Create in case occupying + int index = hashIndex(); // Index of current slot + int fails = 0; // Number of CAS failures + + for (;;) { + Object y; // Contents of current slot + Slot slot = arena[index]; + if (slot == null) // Lazily initialize slots + createSlot(index); // Continue loop to reread + else if ((y = slot.get()) != null && // Try to fulfill + slot.compareAndSet(y, null)) { + Node you = (Node)y; // Transfer item + if (you.compareAndSet(null, item)) { + LockSupport.unpark(you.waiter); + return you.item; + } // Else cancelled; continue + } + else if (y == null && // Try to occupy + slot.compareAndSet(null, me)) { + if (index == 0) // Blocking wait for slot 0 + return timed? awaitNanos(me, slot, nanos): await(me, slot); + Object v = spinWait(me, slot); // Spin wait for non-0 + if (v != CANCEL) + return v; + me = new Node(item); // Throw away cancelled node + int m = max.get(); + if (m > (index >>>= 1)) // Decrease index + max.compareAndSet(m, m - 1); // Maybe shrink table + } + else if (++fails > 1) { // Allow 2 fails on 1st slot + int m = max.get(); + if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) + index = m + 1; // Grow on 3rd failed slot + else if (--index < 0) + index = m; // Circularly traverse } + } + } + + /** + * Returns a hash index for the current thread. Uses a one-step + * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/) + * based on the current thread's Thread.getId(). These hash codes + * have more uniform distribution properties with respect to small + * moduli (here 1-31) than do other simple hashing functions. + * + *

To return an index between 0 and max, we use a cheap + * approximation to a mod operation, that also corrects for bias + * due to non-power-of-2 remaindering (see {@link + * java.util.Random#nextInt}). Bits of the hashcode are masked + * with "nbits", the ceiling power of two of table size (looked up + * in a table packed into three ints). If too large, this is + * retried after rotating the hash by nbits bits, while forcing new + * top bit to 0, which guarantees eventual termination (although + * with a non-random-bias). This requires an average of less than + * 2 tries for all table sizes, and has a maximum 2% difference + * from perfectly uniform slot probabilities when applied to all + * possible hash codes for sizes less than 32. + * + * @return a per-thread-random index, 0 <= index < max + */ + private final int hashIndex() { + long id = Thread.currentThread().getId(); + int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193; + + int m = max.get(); + int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1)) + ((0x000001f8 >>> m) & 2) | // The constants hold + ((0xffff00f2 >>> m) & 1)); // a lookup table + int index; + while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on + hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m + return index; + } - int count = ++arrivalCount; + /** + * Creates a new slot at given index. Called only when the slot + * appears to be null. Relies on double-check using builtin + * locks, since they rarely contend. This in turn relies on the + * arena array being declared volatile. + * + * @param index the index to add slot at + */ + private void createSlot(int index) { + // Create slot outside of lock to narrow sync region + Slot newSlot = new Slot(); + Slot[] a = arena; + synchronized (a) { + if (a[index] == null) + a[index] = newSlot; + } + } - // If item is already waiting, replace it and signal other thread - if (count == 2) { - other = item; - item = x; - taken.signal(); - return other; - } + /** + * Tries to cancel a wait for the given node waiting in the given + * slot, if so, helping clear the node from its slot to avoid + * garbage retention. + * + * @param node the waiting node + * @param slot the slot it is waiting in + * @return true if successfully cancelled + */ + private static boolean tryCancel(Node node, Slot slot) { + if (!node.compareAndSet(null, CANCEL)) + return false; + if (slot.get() == node) // pre-check to minimize contention + slot.compareAndSet(node, null); + return true; + } - // Otherwise, set item and wait for another thread to - // replace it and signal us. + // Three forms of waiting. Each just different enough not to merge + // code with others. - item = x; - InterruptedException interrupted = null; - try { - while (arrivalCount != 2) { - if (!timed) - taken.await(); - else if (nanos > 0) - nanos = taken.awaitNanos(nanos); - else - break; // timed out - } - } catch (InterruptedException ie) { - interrupted = ie; - } + /** + * Spin-waits for hole for a non-0 slot. Fails if spin elapses + * before hole filled. Does not check interrupt, relying on check + * in public exchange method to abort if interrupted on entry. + * + * @param node the waiting node + * @return on success, the hole; on failure, CANCEL + */ + private static Object spinWait(Node node, Slot slot) { + int spins = SPINS; + for (;;) { + Object v = node.get(); + if (v != null) + return v; + else if (spins > 0) + --spins; + else + tryCancel(node, slot); + } + } + + /** + * Waits for (by spinning and/or blocking) and gets the hole + * filled in by another thread. Fails if interrupted before + * hole filled. + * + * When a node/thread is about to block, it sets its waiter field + * and then rechecks state at least one more time before actually + * parking, thus covering race vs fulfiller noticing that waiter + * is non-null so should be woken. + * + * Thread interruption status is checked only surrounding calls to + * park. The caller is assumed to have checked interrupt status + * on entry. + * + * @param node the waiting node + * @return on success, the hole; on failure, CANCEL + */ + private static Object await(Node node, Slot slot) { + Thread w = Thread.currentThread(); + int spins = SPINS; + for (;;) { + Object v = node.get(); + if (v != null) + return v; + else if (spins > 0) // Spin-wait phase + --spins; + else if (node.waiter == null) // Set up to block next + node.waiter = w; + else if (w.isInterrupted()) // Abort on interrupt + tryCancel(node, slot); + else // Block + LockSupport.park(); + } + } - // Get and reset item and count after the wait. - // (We need to do this even if wait was aborted.) - other = item; - item = null; - count = arrivalCount; - arrivalCount = 0; - taken.signal(); - - // If the other thread replaced item, then we must - // continue even if cancelled. - if (count == 2) { - if (interrupted != null) - Thread.currentThread().interrupt(); - return other; + /** + * Waits for (at index 0) and gets the hole filled in by another + * thread. Fails if timed out or interrupted before hole filled. + * Same basic logic as untimed version, but a bit messier. + * + * @param node the waiting node + * @param nanos the wait time + * @return on success, the hole; on failure, CANCEL + */ + private Object awaitNanos(Node node, Slot slot, long nanos) { + int spins = TIMED_SPINS; + long lastTime = 0; + Thread w = null; + for (;;) { + Object v = node.get(); + if (v != null) + return v; + long now = System.nanoTime(); + if (w == null) + w = Thread.currentThread(); + else + nanos -= now - lastTime; + lastTime = now; + if (nanos > 0) { + if (spins > 0) + --spins; + else if (node.waiter == null) + node.waiter = w; + else if (w.isInterrupted()) + tryCancel(node, slot); + else + LockSupport.parkNanos(nanos); } + else if (tryCancel(node, slot) && !w.isInterrupted()) + return scanOnTimeout(node); + } + } - // If no one is waiting for us, we can back out - if (interrupted != null) - throw interrupted; - else // must be timeout - throw new TimeoutException(); - } finally { - lock.unlock(); + /** + * Sweeps through arena checking for any waiting threads. Called + * only upon return from timeout while waiting in slot 0. When a + * thread gives up on a timed wait, it is possible that a + * previously-entered thread is still waiting in some other + * slot. So we scan to check for any. This is almost always + * overkill, but decreases the likelihood of timeouts when there + * are other threads present to far less than that in lock-based + * exchangers in which earlier-arriving threads may still be + * waiting on entry locks. + * + * @param node the waiting node + * @return another thread's item, or CANCEL + */ + private Object scanOnTimeout(Node node) { + Object y; + for (int j = arena.length - 1; j >= 0; --j) { + Slot slot = arena[j]; + if (slot != null) { + while ((y = slot.get()) != null) { + if (slot.compareAndSet(y, null)) { + Node you = (Node)y; + if (you.compareAndSet(null, node.item)) { + LockSupport.unpark(you.waiter); + return you.item; + } + } + } + } } + return CANCEL; } /** - * Create a new Exchanger. - **/ + * Creates a new Exchanger. + */ public Exchanger() { } /** * Waits for another thread to arrive at this exchange point (unless - * it is {@link Thread#interrupt interrupted}), + * the current thread is {@linkplain Thread#interrupt interrupted}), * and then transfers the given object to it, receiving its object * in return. + * *

If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object - * passed in by the current thread. The current thread returns immediately, + * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. - *

If no other thread is already waiting at the exchange then the + * + *

If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of two things happens: *

    *
  • Some other thread enters the exchange; or - *
  • Some other thread {@link Thread#interrupt interrupts} the current + *
  • Some other thread {@linkplain Thread#interrupt interrupts} the current * thread. *
*

If the current thread: *

    - *
  • has its interrupted status set on entry to this method; or - *
  • is {@link Thread#interrupt interrupted} while waiting - * for the exchange, + *
  • has its interrupted status set on entry to this method; or + *
  • is {@linkplain Thread#interrupt interrupted} while waiting + * for the exchange, *
- * then {@link InterruptedException} is thrown and the current thread's - * interrupted status is cleared. + * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. * * @param x the object to exchange - * @return the object provided by the other thread. - * @throws InterruptedException if current thread was interrupted - * while waiting - **/ + * @return the object provided by the other thread + * @throws InterruptedException if the current thread was + * interrupted while waiting + */ public V exchange(V x) throws InterruptedException { - try { - return doExchange(x, false, 0); - } catch (TimeoutException cannotHappen) { - throw new Error(cannotHappen); + if (!Thread.interrupted()) { + Object v = doExchange(x == null? NULL_ITEM : x, false, 0); + if (v == NULL_ITEM) + return null; + if (v != CANCEL) + return (V)v; + Thread.interrupted(); // Clear interrupt status on IE throw } + throw new InterruptedException(); } /** * Waits for another thread to arrive at this exchange point (unless - * it is {@link Thread#interrupt interrupted}, or the specified waiting - * time elapses), - * and then transfers the given object to it, receiving its object - * in return. + * the current thread is {@linkplain Thread#interrupt interrupted} or + * the specified waiting time elapses), and then transfers the given + * object to it, receiving its object in return. * *

If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object - * passed in by the current thread. The current thread returns immediately, + * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. * - *

If no other thread is already waiting at the exchange then the + *

If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of three things happens: *

    *
  • Some other thread enters the exchange; or - *
  • Some other thread {@link Thread#interrupt interrupts} the current - * thread; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or *
  • The specified waiting time elapses. *
*

If the current thread: *

    - *
  • has its interrupted status set on entry to this method; or - *
  • is {@link Thread#interrupt interrupted} while waiting - * for the exchange, + *
  • has its interrupted status set on entry to this method; or + *
  • is {@linkplain Thread#interrupt interrupted} while waiting + * for the exchange, *
- * then {@link InterruptedException} is thrown and the current thread's - * interrupted status is cleared. + * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. * - *

If the specified waiting time elapses then {@link TimeoutException} - * is thrown. - * If the time is - * less than or equal to zero, the method will not wait at all. + *

If the specified waiting time elapses then {@link + * TimeoutException} is thrown. If the time is less than or equal + * to zero, the method will not wait at all. * * @param x the object to exchange * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument. - * @return the object provided by the other thread. - * @throws InterruptedException if current thread was interrupted - * while waiting - * @throws TimeoutException if the specified waiting time elapses before - * another thread enters the exchange. - **/ - public V exchange(V x, long timeout, TimeUnit unit) + * @param unit the time unit of the timeout argument + * @return the object provided by the other thread + * @throws InterruptedException if the current thread was + * interrupted while waiting + * @throws TimeoutException if the specified waiting time elapses + * before another thread enters the exchange + */ + public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - return doExchange(x, true, unit.toNanos(timeout)); + if (!Thread.interrupted()) { + Object v = doExchange(x == null? NULL_ITEM : x, + true, unit.toNanos(timeout)); + if (v == NULL_ITEM) + return null; + if (v != CANCEL) + return (V)v; + if (!Thread.interrupted()) + throw new TimeoutException(); + } + throw new InterruptedException(); } - } - - Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java?rev=798469&r1=798468&r2=798469&view=diff ============================================================================== --- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java (original) +++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutionException.java Tue Jul 28 09:30:33 2009 @@ -19,14 +19,14 @@ private static final long serialVersionUID = 7830266012832686185L; /** - * Constructs a ExecutionException with no detail message. + * Constructs an ExecutionException with no detail message. * The cause is not initialized, and may subsequently be * initialized by a call to {@link #initCause(Throwable) initCause}. */ protected ExecutionException() { } /** - * Constructs a ExecutionException with the specified detail + * Constructs an ExecutionException with the specified detail * message. The cause is not initialized, and may subsequently be * initialized by a call to {@link #initCause(Throwable) initCause}. * @@ -37,7 +37,7 @@ } /** - * Constructs a ExecutionException with the specified detail + * Constructs an ExecutionException with the specified detail * message and cause. * * @param message the detail message @@ -49,7 +49,7 @@ } /** - * Constructs a ExecutionException with the specified cause. + * Constructs an ExecutionException with the specified cause. * The detail message is set to: *

      *  (cause == null ? null : cause.toString())
Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java?rev=798469&r1=798468&r2=798469&view=diff ============================================================================== --- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java (original) +++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/Executor.java Tue Jul 28 09:30:33 2009 @@ -21,7 +21,7 @@ * executor.execute(new RunnableTask2()); * ... * - * + * * However, the Executor interface does not strictly * require that execution be asynchronous. In the simplest case, an * executor can run the submitted task immediately in the caller's @@ -52,7 +52,7 @@ * *
  * class SerialExecutor implements Executor {
- *     final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
+ *     final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
  *     final Executor executor;
  *     Runnable active;
  *
@@ -88,6 +88,11 @@
  * extensible thread pool implementation. The {@link Executors} class
  * provides convenient factory methods for these Executors.
  *
+ * 

Memory consistency effects: Actions in a thread prior to + * submitting a {@code Runnable} object to an {@code Executor} + * happen-before + * its execution begins, perhaps in another thread. + * * @since 1.5 * @author Doug Lea */ Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java?rev=798469&r1=798468&r2=798469&view=diff ============================================================================== --- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java (original) +++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorCompletionService.java Tue Jul 28 09:30:33 2009 @@ -6,11 +6,10 @@ package java.util.concurrent; - /** * A {@link CompletionService} that uses a supplied {@link Executor} * to execute tasks. This class arranges that submitted tasks are, - * upon completion, placed on a queue accessible using take. + * upon completion, placed on a queue accessible using {@code take}. * The class is lightweight enough to be suitable for transient use * when processing groups of tasks. * @@ -19,84 +18,102 @@ * Usage Examples. * * Suppose you have a set of solvers for a certain problem, each - * returning a value of some type Result, and would like to + * returning a value of some type {@code Result}, and would like to * run them concurrently, processing the results of each of them that - * return a non-null value, in some method use(Result r). You + * return a non-null value, in some method {@code use(Result r)}. You * could write this as: * - *

- *    void solve(Executor e, Collection<Callable<Result>> solvers)
- *      throws InterruptedException, ExecutionException {
- *        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
- *        for (Callable<Result> s : solvers)
- *            ecs.submit(s);
- *        int n = solvers.size();
- *        for (int i = 0; i < n; ++i) {
- *            Result r = ecs.take().get();
- *            if (r != null) 
- *                use(r);
- *        }
- *    }
- * 
+ *
 {@code
+ * void solve(Executor e,
+ *            Collection> solvers)
+ *     throws InterruptedException, ExecutionException {
+ *     CompletionService ecs
+ *         = new ExecutorCompletionService(e);
+ *     for (Callable s : solvers)
+ *         ecs.submit(s);
+ *     int n = solvers.size();
+ *     for (int i = 0; i < n; ++i) {
+ *         Result r = ecs.take().get();
+ *         if (r != null)
+ *             use(r);
+ *     }
+ * }}
* * Suppose instead that you would like to use the first non-null result * of the set of tasks, ignoring any that encounter exceptions, * and cancelling all other tasks when the first one is ready: * - *
- *    void solve(Executor e, Collection<Callable<Result>> solvers) 
- *      throws InterruptedException {
- *        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
- *        int n = solvers.size();
- *        List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
- *        Result result = null;
- *        try {
- *            for (Callable<Result> s : solvers)
- *                futures.add(ecs.submit(s));
- *            for (int i = 0; i < n; ++i) {
- *                try {
- *                    Result r = ecs.take().get();
- *                    if (r != null) {
- *                        result = r;
- *                        break;
- *                    }
- *                } catch(ExecutionException ignore) {}
- *            }
- *        }
- *        finally {
- *            for (Future<Result> f : futures)
- *                f.cancel(true);
- *        }
- *
- *        if (result != null)
- *            use(result);
- *    }
- * 
+ *
 {@code
+ * void solve(Executor e,
+ *            Collection> solvers)
+ *     throws InterruptedException {
+ *     CompletionService ecs
+ *         = new ExecutorCompletionService(e);
+ *     int n = solvers.size();
+ *     List> futures
+ *         = new ArrayList>(n);
+ *     Result result = null;
+ *     try {
+ *         for (Callable s : solvers)
+ *             futures.add(ecs.submit(s));
+ *         for (int i = 0; i < n; ++i) {
+ *             try {
+ *                 Result r = ecs.take().get();
+ *                 if (r != null) {
+ *                     result = r;
+ *                     break;
+ *                 }
+ *             } catch (ExecutionException ignore) {}
+ *         }
+ *     }
+ *     finally {
+ *         for (Future f : futures)
+ *             f.cancel(true);
+ *     }
+ *
+ *     if (result != null)
+ *         use(result);
+ * }}
*/ public class ExecutorCompletionService implements CompletionService { private final Executor executor; + private final AbstractExecutorService aes; private final BlockingQueue> completionQueue; /** * FutureTask extension to enqueue upon completion */ - private class QueueingFuture extends FutureTask { - QueueingFuture(Callable c) { super(c); } - QueueingFuture(Runnable t, V r) { super(t, r); } - protected void done() { completionQueue.add(this); } + private class QueueingFuture extends FutureTask { + QueueingFuture(FutureTask task) { + super(task, null); + this.task = task; + } + protected void done() { completionQueue.add(task); } + private final Future task; + } + + private FutureTask newTaskFor(Callable task) { + return new FutureTask(task); + } + + private FutureTask newTaskFor(Runnable task, V result) { + return new FutureTask(task, result); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {@link LinkedBlockingQueue} as a completion queue. + * * @param executor the executor to use - * @throws NullPointerException if executor is null + * @throws NullPointerException if executor is {@code null} */ public ExecutorCompletionService(Executor executor) { - if (executor == null) + if (executor == null) throw new NullPointerException(); this.executor = executor; + this.aes = (executor instanceof AbstractExecutorService) ? + (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue>(); } @@ -104,30 +121,36 @@ * Creates an ExecutorCompletionService using the supplied * executor for base task execution and the supplied queue as its * completion queue. + * * @param executor the executor to use * @param completionQueue the queue to use as the completion queue - * normally one dedicated for use by this service - * @throws NullPointerException if executor or completionQueue are null + * normally one dedicated for use by this service. This + * queue is treated as unbounded -- failed attempted + * {@code Queue.add} operations for completed taskes cause + * them not to be retrievable. + * @throws NullPointerException if executor or completionQueue are {@code null} */ public ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue) { - if (executor == null || completionQueue == null) + if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; + this.aes = (executor instanceof AbstractExecutorService) ? + (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future submit(Callable task) { if (task == null) throw new NullPointerException(); - QueueingFuture f = new QueueingFuture(task); - executor.execute(f); + FutureTask f = newTaskFor(task); + executor.execute(new QueueingFuture(f)); return f; } public Future submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); - QueueingFuture f = new QueueingFuture(task, result); - executor.execute(f); + FutureTask f = newTaskFor(task, result); + executor.execute(new QueueingFuture(f)); return f; } @@ -144,5 +167,3 @@ } } - - Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java?rev=798469&r1=798468&r2=798469&view=diff ============================================================================== --- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java (original) +++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ExecutorService.java Tue Jul 28 09:30:33 2009 @@ -5,7 +5,6 @@ */ package java.util.concurrent; - import java.util.List; import java.util.Collection; import java.security.PrivilegedAction; @@ -14,14 +13,18 @@ /** * An {@link Executor} that provides methods to manage termination and * methods that can produce a {@link Future} for tracking progress of - * one or more asynchronous tasks. + * one or more asynchronous tasks. * - *

- * An ExecutorService can be shut down, which will cause it - * to stop accepting new tasks. After being shut down, the executor - * will eventually terminate, at which point no tasks are actively - * executing, no tasks are awaiting execution, and no new tasks can be - * submitted. + *

An ExecutorService can be shut down, which will cause + * it to reject new tasks. Two different methods are provided for + * shutting down an ExecutorService. The {@link #shutdown} + * method will allow previously submitted tasks to execute before + * terminating, while the {@link #shutdownNow} method prevents waiting + * tasks from starting and attempts to stop currently executing tasks. + * Upon termination, an executor has no tasks actively executing, no + * tasks awaiting execution, and no new tasks can be submitted. An + * unused ExecutorService should be shut down to allow + * reclamation of its resources. * *

Method submit extends base method {@link * Executor#execute} by creating and returning a {@link Future} that @@ -35,41 +38,74 @@ *

The {@link Executors} class provides factory methods for the * executor services provided in this package. * - *

Usage Example

+ *

Usage Examples

* * Here is a sketch of a network service in which threads in a thread * pool service incoming requests. It uses the preconfigured {@link * Executors#newFixedThreadPool} factory method: * *
- * class NetworkService {
- *    private final ServerSocket serverSocket;
- *    private final ExecutorService pool;
- *
- *    public NetworkService(int port, int poolSize) throws IOException {
- *      serverSocket = new ServerSocket(port);
- *      pool = Executors.newFixedThreadPool(poolSize);
- *    }
- * 
- *    public void serve() {
- *      try {
- *        for (;;) {
- *          pool.execute(new Handler(serverSocket.accept()));
- *        }
- *      } catch (IOException ex) {
- *        pool.shutdown();
- *      }
- *    }
- *  }
- *
- *  class Handler implements Runnable {
- *    private final Socket socket;
- *    Handler(Socket socket) { this.socket = socket; }
- *    public void run() {
- *      // read and service request
- *    }
+ * class NetworkService implements Runnable {
+ *   private final ServerSocket serverSocket;
+ *   private final ExecutorService pool;
+ *
+ *   public NetworkService(int port, int poolSize)
+ *       throws IOException {
+ *     serverSocket = new ServerSocket(port);
+ *     pool = Executors.newFixedThreadPool(poolSize);
+ *   }
+ *
+ *   public void run() { // run the service
+ *     try {
+ *       for (;;) {
+ *         pool.execute(new Handler(serverSocket.accept()));
+ *       }
+ *     } catch (IOException ex) {
+ *       pool.shutdown();
+ *     }
+ *   }
+ * }
+ *
+ * class Handler implements Runnable {
+ *   private final Socket socket;
+ *   Handler(Socket socket) { this.socket = socket; }
+ *   public void run() {
+ *     // read and service request on socket
+ *   }
  * }
  * 
+ * + * The following method shuts down an ExecutorService in two phases, + * first by calling shutdown to reject incoming tasks, and then + * calling shutdownNow, if necessary, to cancel any lingering tasks: + * + *
+ * void shutdownAndAwaitTermination(ExecutorService pool) {
+ *   pool.shutdown(); // Disable new tasks from being submitted
+ *   try {
+ *     // Wait a while for existing tasks to terminate
+ *     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
+ *       pool.shutdownNow(); // Cancel currently executing tasks
+ *       // Wait a while for tasks to respond to being cancelled
+ *       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
+ *           System.err.println("Pool did not terminate");
+ *     }
+ *   } catch (InterruptedException ie) {
+ *     // (Re-)Cancel if current thread also interrupted
+ *     pool.shutdownNow();
+ *     // Preserve interrupt status
+ *     Thread.currentThread().interrupt();
+ *   }
+ * }
+ * 
+ * + *

Memory consistency effects: Actions in a thread prior to the + * submission of a {@code Runnable} or {@code Callable} task to an + * {@code ExecutorService} + * happen-before + * any actions taken by that task, which in turn happen-before the + * result is retrieved via {@code Future.get()}. + * * @since 1.5 * @author Doug Lea */ @@ -77,33 +113,45 @@ /** * Initiates an orderly shutdown in which previously submitted - * tasks are executed, but no new tasks will be - * accepted. Invocation has no additional effect if already shut - * down. + * tasks are executed, but no new tasks will be accepted. + * Invocation has no additional effect if already shut down. + * + *

This method does not wait for previously submitted tasks to + * complete execution. Use {@link #awaitTermination awaitTermination} + * to do that. + * * @throws SecurityException if a security manager exists and - * shutting down this ExecutorService may manipulate threads that - * the caller is not permitted to modify because it does not hold - * {@link java.lang.RuntimePermission}("modifyThread"), - * or the security manager's checkAccess method denies access. + * shutting down this ExecutorService may manipulate + * threads that the caller is not permitted to modify + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + * or the security manager's checkAccess method + * denies access. */ void shutdown(); /** * Attempts to stop all actively executing tasks, halts the - * processing of waiting tasks, and returns a list of the tasks that were - * awaiting execution. - * + * processing of waiting tasks, and returns a list of the tasks + * that were awaiting execution. + * + *

This method does not wait for actively executing tasks to + * terminate. Use {@link #awaitTermination awaitTermination} to + * do that. + * *

There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. For example, typical - * implementations will cancel via {@link Thread#interrupt}, so if any - * tasks mask or fail to respond to interrupts, they may never terminate. + * implementations will cancel via {@link Thread#interrupt}, so any + * task that fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution * @throws SecurityException if a security manager exists and - * shutting down this ExecutorService may manipulate threads that - * the caller is not permitted to modify because it does not hold - * {@link java.lang.RuntimePermission}("modifyThread"), - * or the security manager's checkAccess method denies access. + * shutting down this ExecutorService may manipulate + * threads that the caller is not permitted to modify + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + * or the security manager's checkAccess method + * denies access. */ List shutdownNow(); @@ -130,8 +178,8 @@ * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument - * @return true if this executor terminated and false - * if the timeout elapsed before termination + * @return true if this executor terminated and + * false if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) @@ -139,8 +187,10 @@ /** - * Submits a value-returning task for execution and returns a Future - * representing the pending results of the task. + * Submits a value-returning task for execution and returns a + * Future representing the pending results of the task. The + * Future's get method will return the task's result upon + * successful completion. * *

* If you would like to immediately block waiting @@ -154,88 +204,92 @@ * * @param task the task to submit * @return a Future representing pending completion of the task - * @throws RejectedExecutionException if task cannot be scheduled - * for execution - * @throws NullPointerException if task null + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null */ Future submit(Callable task); /** - * Submits a Runnable task for execution and returns a Future - * representing that task that will upon completion return - * the given result + * Submits a Runnable task for execution and returns a Future + * representing that task. The Future's get method will + * return the given result upon successful completion. * * @param task the task to submit * @param result the result to return - * @return a Future representing pending completion of the task, - * and whose get() method will return the given result - * upon completion. - * @throws RejectedExecutionException if task cannot be scheduled - * for execution - * @throws NullPointerException if task null + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null */ Future submit(Runnable task, T result); /** - * Submits a Runnable task for execution and returns a Future - * representing that task. + * Submits a Runnable task for execution and returns a Future + * representing that task. The Future's get method will + * return null upon successful completion. * * @param task the task to submit - * @return a Future representing pending completion of the task, - * and whose get() method will return null - * upon completion. - * @throws RejectedExecutionException if task cannot be scheduled - * for execution - * @throws NullPointerException if task null + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null */ Future submit(Runnable task); /** - * Executes the given tasks, returning their results - * when all complete. + * Executes the given tasks, returning a list of Futures holding + * their status and results when all complete. + * {@link Future#isDone} is true for each + * element of the returned list. * Note that a completed task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress. + * * @param tasks the collection of tasks * @return A list of Futures representing the tasks, in the same - * sequential order as produced by the iterator for the given task - * list, each of which has completed. + * sequential order as produced by the iterator for the + * given task list, each of which has completed. * @throws InterruptedException if interrupted while waiting, in - * which case unfinished tasks are cancelled. + * which case unfinished tasks are cancelled. * @throws NullPointerException if tasks or any of its elements are null - * @throws RejectedExecutionException if any task cannot be scheduled - * for execution + * @throws RejectedExecutionException if any task cannot be + * scheduled for execution */ List> invokeAll(Collection> tasks) throws InterruptedException; /** - * Executes the given tasks, returning their results + * Executes the given tasks, returning a list of Futures holding + * their status and results * when all complete or the timeout expires, whichever happens first. + * {@link Future#isDone} is true for each + * element of the returned list. * Upon return, tasks that have not completed are cancelled. * Note that a completed task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress. + * * @param tasks the collection of tasks * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument - * @return A list of Futures representing the tasks, in the same - * sequential order as produced by the iterator for the given - * task list. If the operation did not time out, each task will - * have completed. If it did time out, some of thiese tasks will - * not have completed. + * @return a list of Futures representing the tasks, in the same + * sequential order as produced by the iterator for the + * given task list. If the operation did not time out, + * each task will have completed. If it did time out, some + * of these tasks will not have completed. * @throws InterruptedException if interrupted while waiting, in - * which case unfinished tasks are cancelled. + * which case unfinished tasks are cancelled * @throws NullPointerException if tasks, any of its elements, or - * unit are null + * unit are null * @throws RejectedExecutionException if any task cannot be scheduled - * for execution + * for execution */ - List> invokeAll(Collection> tasks, - long timeout, TimeUnit unit) + List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException; /** @@ -245,15 +299,16 @@ * tasks that have not completed are cancelled. * The results of this method are undefined if the given * collection is modified while this operation is in progress. + * * @param tasks the collection of tasks - * @return The result returned by one of the tasks. + * @return the result returned by one of the tasks * @throws InterruptedException if interrupted while waiting * @throws NullPointerException if tasks or any of its elements - * are null - * @throws IllegalArgumentException if tasks empty + * are null + * @throws IllegalArgumentException if tasks is empty * @throws ExecutionException if no task successfully completes * @throws RejectedExecutionException if tasks cannot be scheduled - * for execution + * for execution */ T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException; @@ -266,21 +321,21 @@ * completed are cancelled. * The results of this method are undefined if the given * collection is modified while this operation is in progress. + * * @param tasks the collection of tasks * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument - * @return The result returned by one of the tasks. + * @return the result returned by one of the tasks. * @throws InterruptedException if interrupted while waiting * @throws NullPointerException if tasks, any of its elements, or - * unit are null + * unit are null * @throws TimeoutException if the given timeout elapses before - * any task successfully completes + * any task successfully completes * @throws ExecutionException if no task successfully completes * @throws RejectedExecutionException if tasks cannot be scheduled - * for execution + * for execution */ - T invokeAny(Collection> tasks, - long timeout, TimeUnit unit) + T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; - }