Return-Path: Delivered-To: apmail-harmony-commits-archive@www.apache.org Received: (qmail 55519 invoked from network); 28 Jul 2009 09:30:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 28 Jul 2009 09:30:47 -0000 Received: (qmail 63186 invoked by uid 500); 28 Jul 2009 09:32:04 -0000 Delivered-To: apmail-harmony-commits-archive@harmony.apache.org Received: (qmail 63144 invoked by uid 500); 28 Jul 2009 09:32:04 -0000 Mailing-List: contact commits-help@harmony.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@harmony.apache.org Delivered-To: mailing list commits@harmony.apache.org Received: (qmail 63135 invoked by uid 99); 28 Jul 2009 09:32:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jul 2009 09:32:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jul 2009 09:31:48 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9F7CA23889D5; Tue, 28 Jul 2009 09:30:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r798469 [11/28] - in /harmony/enhanced/classlib/branches/java6: ./ depends/build/platform/ depends/files/ depends/jars/ depends/manifests/icu4j_4.0/ depends/manifests/icu4j_4.2.1/ depends/manifests/icu4j_4.2.1/META-INF/ make/ modules/access... Date: Tue, 28 Jul 2009 09:30:48 -0000 To: commits@harmony.apache.org From: hindessm@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090728093059.9F7CA23889D5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java?rev=798469&r1=798468&r2=798469&view=diff ============================================================================== --- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java (original) +++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java Tue Jul 28 09:30:33 2009 @@ -10,13 +10,13 @@ /** * An {@link ExecutorService} that can schedule commands to run after a given - * delay, or to execute periodically. + * delay, or to execute periodically. * *

The schedule methods create tasks with various delays * and return a task object that can be used to cancel or check * execution. The scheduleAtFixedRate and * scheduleWithFixedDelay methods create and execute tasks - * that run periodically until cancelled. + * that run periodically until cancelled. * *

Commands submitted using the {@link Executor#execute} and * {@link ExecutorService} submit methods are scheduled with @@ -33,27 +33,27 @@ * TimeUnit.MILLISECONDS). Beware however that expiration of a * relative delay need not coincide with the current Date at * which the task is enabled due to network time synchronization - * protocols, clock drift, or other factors. + * protocols, clock drift, or other factors. * * The {@link Executors} class provides convenient factory methods for * the ScheduledExecutorService implementations provided in this package. * *

Usage Example

- * + * * Here is a class with a method that sets up a ScheduledExecutorService * to beep every ten seconds for an hour: * *
- * import static java.util.concurrent.TimeUnit;
+ * import static java.util.concurrent.TimeUnit.*;
  * class BeeperControl {
- *    private final ScheduledExecutorService scheduler = 
+ *    private final ScheduledExecutorService scheduler =
  *       Executors.newScheduledThreadPool(1);
  *
  *    public void beepForAnHour() {
  *        final Runnable beeper = new Runnable() {
  *                public void run() { System.out.println("beep"); }
  *            };
- *        final ScheduledFuture<?> beeperHandle = 
+ *        final ScheduledFuture<?> beeperHandle =
  *            scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
  *        scheduler.schedule(new Runnable() {
  *                public void run() { beeperHandle.cancel(true); }
@@ -70,76 +70,90 @@
     /**
      * Creates and executes a one-shot action that becomes enabled
      * after the given delay.
-     * @param command the task to execute.
-     * @param delay the time from now to delay execution.
-     * @param unit the time unit of the delay parameter.
-     * @return a Future representing pending completion of the task,
-     * and whose get() method will return null
-     * upon completion.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     *
+     * @param command the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of
+     *         the task and whose get() method will return
+     *         null upon completion
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if command is null
      */
-    public ScheduledFuture schedule(Runnable command, long delay,  TimeUnit unit);
+    public ScheduledFuture schedule(Runnable command,
+                                       long delay, TimeUnit unit);
 
     /**
      * Creates and executes a ScheduledFuture that becomes enabled after the
      * given delay.
-     * @param callable the function to execute.
-     * @param delay the time from now to delay execution.
-     * @param unit the time unit of the delay parameter.
-     * @return a ScheduledFuture that can be used to extract result or cancel.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     *
+     * @param callable the function to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture that can be used to extract result or cancel
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if callable is null
      */
-    public  ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
+    public  ScheduledFuture schedule(Callable callable,
+                                           long delay, TimeUnit unit);
 
     /**
      * Creates and executes a periodic action that becomes enabled first
      * after the given initial delay, and subsequently with the given
      * period; that is executions will commence after
      * initialDelay then initialDelay+period, then
-     * initialDelay + 2 * period, and so on.  
+     * initialDelay + 2 * period, and so on.
      * If any execution of the task
      * encounters an exception, subsequent executions are suppressed.
      * Otherwise, the task will only terminate via cancellation or
-     * termination of the executor.
-     * @param command the task to execute.
-     * @param initialDelay the time to delay first execution.
-     * @param period the period between successive executions.
+     * termination of the executor.  If any execution of this task
+     * takes longer than its period, then subsequent executions
+     * may start late, but will not concurrently execute.
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
      * @param unit the time unit of the initialDelay and period parameters
-     * @return a Future representing pending completion of the task,
-     * and whose get() method will throw an exception upon
-     * cancellation.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose get() method will throw an
+     *         exception upon cancellation
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if command is null
-     * @throws IllegalArgumentException if period less than or equal to zero.
+     * @throws IllegalArgumentException if period less than or equal to zero
      */
-    public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit);
-    
+    public ScheduledFuture scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit);
+
     /**
      * Creates and executes a periodic action that becomes enabled first
      * after the given initial delay, and subsequently with the
      * given delay between the termination of one execution and the
-     * commencement of the next. If any execution of the task
+     * commencement of the next.  If any execution of the task
      * encounters an exception, subsequent executions are suppressed.
      * Otherwise, the task will only terminate via cancellation or
      * termination of the executor.
-     * @param command the task to execute.
-     * @param initialDelay the time to delay first execution.
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
      * @param delay the delay between the termination of one
-     * execution and the commencement of the next.
+     * execution and the commencement of the next
      * @param unit the time unit of the initialDelay and delay parameters
-     * @return a Future representing pending completion of the task,
-     * and whose get() method will throw an exception upon
-     * cancellation.
-     * @throws RejectedExecutionException if task cannot be scheduled
-     * for execution.
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose get() method will throw an
+     *         exception upon cancellation
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
      * @throws NullPointerException if command is null
-     * @throws IllegalArgumentException if delay less than or equal to zero.
+     * @throws IllegalArgumentException if delay less than or equal to zero
      */
-    public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit);
+    public ScheduledFuture scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit);
 
 }

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java Tue Jul 28 09:30:33 2009
@@ -10,6 +10,7 @@
 
 package java.util.concurrent;
 import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 import java.util.*;
 
 /**
@@ -20,25 +21,60 @@
  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
  * this class extends) are required.
  *
- * 

Delayed tasks execute no sooner than they are enabled, but + *

Delayed tasks execute no sooner than they are enabled, but * without any real-time guarantees about when, after they are * enabled, they will commence. Tasks scheduled for exactly the same * execution time are enabled in first-in-first-out (FIFO) order of * submission. * + *

Successive executions of a task scheduled via + * scheduleAtFixedRate or + * scheduleWithFixedDelay do not overlap. While different + * executions may be performed by different threads, the effects of + * prior executions happen-before + * those of subsequent ones. + * *

While this class inherits from {@link ThreadPoolExecutor}, a few * of the inherited tuning methods are not useful for it. In - * particular, because it acts as a fixed-sized pool using - * corePoolSize threads and an unbounded queue, adjustments - * to maximumPoolSize have no useful effect. + * particular, because it acts as a fixed-sized pool using {@code + * corePoolSize} threads and an unbounded queue, adjustments to {@code + * maximumPoolSize} have no useful effect. * * @since 1.5 * @author Doug Lea */ -public class ScheduledThreadPoolExecutor - extends ThreadPoolExecutor +public class ScheduledThreadPoolExecutor + extends ThreadPoolExecutor implements ScheduledExecutorService { + /* + * This class specializes ThreadPoolExecutor implementation by + * + * 1. Using a custom task type, ScheduledFutureTask for + * tasks, even those that don't require scheduling (i.e., + * those submitted using ExecutorService execute, not + * ScheduledExecutorService methods) which are treated as + * delayed tasks with a delay of zero. + * + * 2. Using a custom queue (DelayedWorkQueue), a variant of + * unbounded DelayQueue. The lack of capacity constraint and + * the fact that corePoolSize and maximumPoolSize are + * effectively identical simplifies some execution mechanics + * (see delayedExecute) compared to ThreadPoolExecutor. + * + * 3. Supporting optional run-after-shutdown parameters, which + * leads to overrides of shutdown methods to remove and cancel + * tasks that should NOT be run after shutdown, as well as + * different recheck logic when task (re)submission overlaps + * with a shutdown. + * + * 4. Task decoration methods to allow interception and + * instrumentation, which are needed because subclasses cannot + * otherwise override submit methods to get this effect. These + * don't have any impact on pool control logic though. + */ + /** * False if should cancel/suppress periodic tasks on shutdown. */ @@ -50,28 +86,40 @@ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** + * True if ScheduledFutureTask.cancel should remove from queue + */ + private volatile boolean removeOnCancel = false; + + /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ private static final AtomicLong sequencer = new AtomicLong(0); - /** Base of nanosecond timings, to avoid wrapping */ - private static final long NANO_ORIGIN = System.nanoTime(); /** - * Returns nanosecond time offset by origin + * Value of System.nanoTime upon static initialization. This is + * used as an offset by now() to avoid wraparound of time values + * that would make them appear negative. */ - final long now() { - return System.nanoTime() - NANO_ORIGIN; + static final long initialNanoTime = System.nanoTime(); + + /** + * Returns current nanosecond time. + */ + static long now() { + return System.nanoTime() - initialNanoTime; } - private class ScheduledFutureTask + private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { - + /** Sequence number to break ties FIFO */ private final long sequenceNumber; + /** The time the task is enabled to execute in nanoTime units */ private long time; + /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value @@ -80,8 +128,16 @@ */ private final long period; + /** The actual task to be re-enqueued by reExecutePeriodic */ + ScheduledFutureTask outerTask = this; + /** - * Creates a one-shot action with given nanoTime-based trigger time + * Index into delay queue, to support faster cancellation. + */ + int heapIndex; + + /** + * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); @@ -91,9 +147,9 @@ } /** - * Creates a periodic action with given nano time and period + * Creates a periodic action with given nano time and period. */ - ScheduledFutureTask(Runnable r, V result, long ns, long period) { + ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; @@ -101,7 +157,7 @@ } /** - * Creates a one-shot action with given nanoTime-based trigger + * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable callable, long ns) { super(callable); @@ -111,122 +167,162 @@ } public long getDelay(TimeUnit unit) { - long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); - return d; + long d = time - now(); + return d<=0? 0 : unit.convert(d, TimeUnit.NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; - ScheduledFutureTask x = (ScheduledFutureTask)other; - long diff = time - x.time; - if (diff < 0) - return -1; - else if (diff > 0) - return 1; - else if (sequenceNumber < x.sequenceNumber) - return -1; - else - return 1; + if (other instanceof ScheduledFutureTask) { + ScheduledFutureTask x = (ScheduledFutureTask)other; + long diff = time - x.time; + if (diff < 0) + return -1; + else if (diff > 0) + return 1; + else if (sequenceNumber < x.sequenceNumber) + return -1; + else + return 1; + } + long d = (getDelay(TimeUnit.NANOSECONDS) - + other.getDelay(TimeUnit.NANOSECONDS)); + return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } /** * Returns true if this is a periodic (not a one-shot) action. + * * @return true if periodic */ - boolean isPeriodic() { + public boolean isPeriodic() { return period != 0; } /** - * Run a periodic task + * Sets the next time to run for a periodic task. */ - private void runPeriodic() { - boolean ok = ScheduledFutureTask.super.runAndReset(); - boolean down = isShutdown(); - // Reschedule if not cancelled and not shutdown or policy allows - if (ok && (!down || - (getContinueExistingPeriodicTasksAfterShutdownPolicy() && - !isTerminating()))) { - long p = period; - if (p > 0) - time += p; - else - time = now() - p; - ScheduledThreadPoolExecutor.super.getQueue().add(this); - } - // This might have been the final executed delayed - // task. Wake up threads to check. - else if (down) - interruptIdleWorkers(); + private void setNextRunTime() { + long p = period; + if (p > 0) + time += p; + else + time = now() - p; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + if (cancelled && removeOnCancel && heapIndex >= 0) + remove(this); + return cancelled; } /** * Overrides FutureTask version so as to reset/requeue if periodic. - */ + */ public void run() { - if (isPeriodic()) - runPeriodic(); - else + boolean periodic = isPeriodic(); + if (!canRunInCurrentRunState(periodic)) + cancel(false); + else if (!periodic) ScheduledFutureTask.super.run(); + else if (ScheduledFutureTask.super.runAndReset()) { + setNextRunTime(); + reExecutePeriodic(outerTask); + } + } + } + + /** + * Returns true if can run a task given current run state + * and run-after-shutdown parameters. + * + * @param periodic true if this task periodic, false if delayed + */ + boolean canRunInCurrentRunState(boolean periodic) { + return isRunningOrShutdown(periodic ? + continueExistingPeriodicTasksAfterShutdown : + executeExistingDelayedTasksAfterShutdown); + } + + /** + * Main execution method for delayed or periodic tasks. If pool + * is shut down, rejects the task. Otherwise adds task to queue + * and starts a thread, if necessary, to run it. (We cannot + * prestart the thread to run the task because the task (probably) + * shouldn't be run yet,) If the pool is shut down while the task + * is being added, cancel and remove it if required by state and + * run-after-shutdown parameters. + * + * @param task the task + */ + private void delayedExecute(ScheduledFutureTask task) { + if (isShutdown()) + reject(task); + else { + super.getQueue().add(task); + if (isShutdown() && + !canRunInCurrentRunState(task.isPeriodic()) && + remove(task)) + task.cancel(false); + else + prestartCoreThread(); } } /** - * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. + * Requeues a periodic task unless current run state precludes it. + * Same idea as delayedExecute except drops task rather than rejecting. + * + * @param task the task */ - private void delayedExecute(Runnable command) { - if (isShutdown()) { - reject(command); - return; - } - // Prestart a thread if necessary. We cannot prestart it - // running the task because the task (probably) shouldn't be - // run yet, so thread will just idle until delay elapses. - if (getPoolSize() < getCorePoolSize()) - prestartCoreThread(); - - super.getQueue().add(command); + void reExecutePeriodic(ScheduledFutureTask task) { + if (canRunInCurrentRunState(true)) { + super.getQueue().add(task); + if (!canRunInCurrentRunState(true) && remove(task)) + task.cancel(false); + else + prestartCoreThread(); + } } /** - * Cancel and clear the queue of all tasks that should not be run - * due to shutdown policy. - */ - private void cancelUnwantedTasks() { - boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); - boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); - if (!keepDelayed && !keepPeriodic) - super.getQueue().clear(); - else if (keepDelayed || keepPeriodic) { - Object[] entries = super.getQueue().toArray(); - for (int i = 0; i < entries.length; ++i) { - Object e = entries[i]; + * Cancels and clears the queue of all tasks that should not be run + * due to shutdown policy. Invoked within super.shutdown. + */ + @Override void onShutdown() { + BlockingQueue q = super.getQueue(); + boolean keepDelayed = + getExecuteExistingDelayedTasksAfterShutdownPolicy(); + boolean keepPeriodic = + getContinueExistingPeriodicTasksAfterShutdownPolicy(); + if (!keepDelayed && !keepPeriodic) + q.clear(); + else { + // Traverse snapshot to avoid iterator exceptions + for (Object e : q.toArray()) { if (e instanceof ScheduledFutureTask) { - ScheduledFutureTask t = (ScheduledFutureTask)e; - if (t.isPeriodic()? !keepPeriodic : !keepDelayed) - t.cancel(false); + ScheduledFutureTask t = + (ScheduledFutureTask)e; + if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || + t.isCancelled()) { // also remove if already cancelled + if (q.remove(t)) + t.cancel(false); + } } } - entries = null; - purge(); } - } - - public boolean remove(Runnable task) { - if (!(task instanceof ScheduledFutureTask)) - return false; - return getQueue().remove(task); + tryTerminate(); } /** - * Creates a new ScheduledThreadPoolExecutor with the given core - * pool size. - * - * @param corePoolSize the number of threads to keep in the pool, - * even if they are idle. - * @throws IllegalArgumentException if corePoolSize less than or - * equal to zero + * Creates a new {@code ScheduledThreadPoolExecutor} with the + * given core pool size. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle + * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, @@ -234,14 +330,15 @@ } /** - * Creates a new ScheduledThreadPoolExecutor with the given - * initial parameters. - * - * @param corePoolSize the number of threads to keep in the pool, - * even if they are idle. + * Creates a new {@code ScheduledThreadPoolExecutor} with the + * given initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle * @param threadFactory the factory to use when the executor - * creates a new thread. - * @throws NullPointerException if threadFactory is null + * creates a new thread + * @throws IllegalArgumentException if {@code corePoolSize < 0} + * @throws NullPointerException if {@code threadFactory} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { @@ -252,12 +349,13 @@ /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. - * - * @param corePoolSize the number of threads to keep in the pool, - * even if they are idle. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle * @param handler the handler to use when execution is blocked - * because the thread bounds and queue capacities are reached. - * @throws NullPointerException if handler is null + * because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if {@code corePoolSize < 0} + * @throws NullPointerException if {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { @@ -268,14 +366,16 @@ /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. - * - * @param corePoolSize the number of threads to keep in the pool, - * even if they are idle. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle * @param threadFactory the factory to use when the executor - * creates a new thread. + * creates a new thread * @param handler the handler to use when execution is blocked - * because the thread bounds and queue capacities are reached. - * @throws NullPointerException if threadFactory or handler is null + * because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if {@code corePoolSize < 0} + * @throws NullPointerException if {@code threadFactory} or + * {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, @@ -284,128 +384,178 @@ new DelayedWorkQueue(), threadFactory, handler); } - public ScheduledFuture schedule(Runnable command, - long delay, + /** + * Returns the trigger time of a delayed action + */ + private static long nextTriggerTime(long delay, TimeUnit unit) { + long triggerTime; + long now = now(); + if (delay <= 0) + return now; // avoid negative trigger times + else if ((triggerTime = now + unit.toNanos(delay)) < 0) + return Long.MAX_VALUE; // avoid numerical overflow + else + return triggerTime; + } + + /** + * @throws RejectedExecutionException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public ScheduledFuture schedule(Runnable command, + long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); - long triggerTime = now() + unit.toNanos(delay); - ScheduledFutureTask t = - new ScheduledFutureTask(command, null, triggerTime); + long triggerTime = nextTriggerTime(delay, unit); + ScheduledFutureTask t + = new ScheduledFutureTask(command, null, triggerTime); delayedExecute(t); return t; } - - public ScheduledFuture schedule(Callable callable, - long delay, + + /** + * @throws RejectedExecutionException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public ScheduledFuture schedule(Callable callable, + long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); - if (delay < 0) delay = 0; - long triggerTime = now() + unit.toNanos(delay); - ScheduledFutureTask t = - new ScheduledFutureTask(callable, triggerTime); + long triggerTime = nextTriggerTime(delay, unit); + ScheduledFutureTask t + = new ScheduledFutureTask(callable, triggerTime); delayedExecute(t); return t; } - public ScheduledFuture scheduleAtFixedRate(Runnable command, - long initialDelay, - long period, + /** + * @throws RejectedExecutionException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + public ScheduledFuture scheduleAtFixedRate(Runnable command, + long initialDelay, + long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); if (initialDelay < 0) initialDelay = 0; - long triggerTime = now() + unit.toNanos(initialDelay); - ScheduledFutureTask t = - new ScheduledFutureTask(command, - null, - triggerTime, - unit.toNanos(period)); - delayedExecute(t); - return t; + long triggerTime = nextTriggerTime(initialDelay, unit); + ScheduledFutureTask sft = + new ScheduledFutureTask(command, + null, + triggerTime, + unit.toNanos(period)); + sft.outerTask = sft; + delayedExecute(sft); + return sft; } - - public ScheduledFuture scheduleWithFixedDelay(Runnable command, - long initialDelay, - long delay, + + /** + * @throws RejectedExecutionException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + public ScheduledFuture scheduleWithFixedDelay(Runnable command, + long initialDelay, + long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); - if (initialDelay < 0) initialDelay = 0; - long triggerTime = now() + unit.toNanos(initialDelay); - ScheduledFutureTask t = - new ScheduledFutureTask(command, - null, - triggerTime, - unit.toNanos(-delay)); - delayedExecute(t); - return t; + long triggerTime = nextTriggerTime(initialDelay, unit); + ScheduledFutureTask sft = + new ScheduledFutureTask(command, + null, + triggerTime, + unit.toNanos(-delay)); + sft.outerTask = sft; + delayedExecute(sft); + return sft; } - /** - * Execute command with zero required delay. This has effect - * equivalent to schedule(command, 0, anyUnit). Note - * that inspections of the queue and of the list returned by - * shutdownNow will access the zero-delayed - * {@link ScheduledFuture}, not the command itself. + * Executes {@code command} with zero required delay. + * This has effect equivalent to + * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. + * Note that inspections of the queue and of the list returned by + * {@code shutdownNow} will access the zero-delayed + * {@link ScheduledFuture}, not the {@code command} itself. + * + *

A consequence of the use of {@code ScheduledFuture} objects is + * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always + * called with a null second {@code Throwable} argument, even if the + * {@code command} terminated abruptly. Instead, the {@code Throwable} + * thrown by such a task can be obtained via {@link Future#get}. * - * @param command the task to execute * @throws RejectedExecutionException at discretion of - * RejectedExecutionHandler, if task cannot be accepted - * for execution because the executor has been shut down. - * @throws NullPointerException if command is null + * {@code RejectedExecutionHandler}, if the task + * cannot be accepted for execution because the + * executor has been shut down + * @throws NullPointerException {@inheritDoc} */ public void execute(Runnable command) { - if (command == null) - throw new NullPointerException(); schedule(command, 0, TimeUnit.NANOSECONDS); } // Override AbstractExecutorService methods + /** + * @throws RejectedExecutionException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public Future submit(Runnable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } + /** + * @throws RejectedExecutionException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public Future submit(Runnable task, T result) { - return schedule(Executors.callable(task, result), + return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); } + /** + * @throws RejectedExecutionException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ public Future submit(Callable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } /** - * Set policy on whether to continue executing existing periodic - * tasks even when this executor has been shutdown. In - * this case, these tasks will only terminate upon - * shutdownNow, or after setting the policy to - * false when already shutdown. This value is by default - * false. - * @param value if true, continue after shutdown, else don't. - * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy + * Sets the policy on whether to continue executing existing + * periodic tasks even when this executor has been {@code shutdown}. + * In this case, these tasks will only terminate upon + * {@code shutdownNow} or after setting the policy to + * {@code false} when already shutdown. + * This value is by default {@code false}. + * + * @param value if {@code true}, continue after shutdown, else don't. + * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy */ public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) - cancelUnwantedTasks(); + onShutdown(); } /** - * Get the policy on whether to continue executing existing - * periodic tasks even when this executor has been - * shutdown. In this case, these tasks will only - * terminate upon shutdownNow or after setting the policy - * to false when already shutdown. This value is by - * default false. - * @return true if will continue after shutdown. + * Gets the policy on whether to continue executing existing + * periodic tasks even when this executor has been {@code shutdown}. + * In this case, these tasks will only terminate upon + * {@code shutdownNow} or after setting the policy to + * {@code false} when already shutdown. + * This value is by default {@code false}. + * + * @return {@code true} if will continue after shutdown * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy */ public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { @@ -413,66 +563,79 @@ } /** - * Set policy on whether to execute existing delayed - * tasks even when this executor has been shutdown. In - * this case, these tasks will only terminate upon - * shutdownNow, or after setting the policy to - * false when already shutdown. This value is by default - * true. - * @param value if true, execute after shutdown, else don't. + * Sets the policy on whether to execute existing delayed + * tasks even when this executor has been {@code shutdown}. + * In this case, these tasks will only terminate upon + * {@code shutdownNow}, or after setting the policy to + * {@code false} when already shutdown. + * This value is by default {@code true}. + * + * @param value if {@code true}, execute after shutdown, else don't. * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy */ public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) - cancelUnwantedTasks(); + onShutdown(); } /** - * Get policy on whether to execute existing delayed - * tasks even when this executor has been shutdown. In - * this case, these tasks will only terminate upon - * shutdownNow, or after setting the policy to - * false when already shutdown. This value is by default - * true. - * @return true if will execute after shutdown. + * Gets the policy on whether to execute existing delayed + * tasks even when this executor has been {@code shutdown}. + * In this case, these tasks will only terminate upon + * {@code shutdownNow}, or after setting the policy to + * {@code false} when already shutdown. + * This value is by default {@code true}. + * + * @return {@code true} if will execute after shutdown * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy */ public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { return executeExistingDelayedTasksAfterShutdown; } - /** * Initiates an orderly shutdown in which previously submitted - * tasks are executed, but no new tasks will be accepted. If the - * ExecuteExistingDelayedTasksAfterShutdownPolicy has - * been set false, existing delayed tasks whose delays - * have not yet elapsed are cancelled. And unless the - * ContinueExistingPeriodicTasksAfterShutdownPolicy has - * been set true, future executions of existing periodic - * tasks will be cancelled. + * tasks are executed, but no new tasks will be accepted. + * Invocation has no additional effect if already shut down. + * + *

This method does not wait for previously submitted tasks to + * complete execution. Use {@link #awaitTermination awaitTermination} + * to do that. + * + *

If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} + * has been set {@code false}, existing delayed tasks whose delays + * have not yet elapsed are cancelled. And unless the {@code + * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set + * {@code true}, future executions of existing periodic tasks will + * be cancelled. + * + * @throws SecurityException {@inheritDoc} */ public void shutdown() { - cancelUnwantedTasks(); super.shutdown(); } /** * Attempts to stop all actively executing tasks, halts the - * processing of waiting tasks, and returns a list of the tasks that were - * awaiting execution. - * + * processing of waiting tasks, and returns a list of the tasks + * that were awaiting execution. + * + *

This method does not wait for actively executing tasks to + * terminate. Use {@link #awaitTermination awaitTermination} to + * do that. + * *

There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation - * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or - * fail to respond to interrupts, they may never terminate. + * cancels tasks via {@link Thread#interrupt}, so any task that + * fails to respond to interrupts may never terminate. * - * @return list of tasks that never commenced execution. Each - * element of this list is a {@link ScheduledFuture}, - * including those tasks submitted using execute, which - * are for scheduling purposes used as the basis of a zero-delay - * ScheduledFuture. + * @return list of tasks that never commenced execution. + * Each element of this list is a {@link ScheduledFuture}, + * including those tasks submitted using {@code execute}, + * which are for scheduling purposes used as the basis of a + * zero-delay {@code ScheduledFuture}. + * @throws SecurityException {@inheritDoc} */ public List shutdownNow() { return super.shutdownNow(); @@ -481,9 +644,9 @@ /** * Returns the task queue used by this executor. Each element of * this queue is a {@link ScheduledFuture}, including those - * tasks submitted using execute which are for scheduling + * tasks submitted using {@code execute} which are for scheduling * purposes used as the basis of a zero-delay - * ScheduledFuture. Iteration over this queue is + * {@code ScheduledFuture}. Iteration over this queue is * not guaranteed to traverse tasks in the order in * which they will execute. * @@ -494,52 +657,478 @@ } /** - * An annoying wrapper class to convince generics compiler to - * use a DelayQueue as a BlockingQueue - */ - private static class DelayedWorkQueue - extends AbstractCollection + * Specialized delay queue. To mesh with TPE declarations, this + * class must be declared as a BlockingQueue even though + * it can only hold RunnableScheduledFutures. + */ + static class DelayedWorkQueue extends AbstractQueue implements BlockingQueue { - - private final DelayQueue dq = new DelayQueue(); - public Runnable poll() { return dq.poll(); } - public Runnable peek() { return dq.peek(); } - public Runnable take() throws InterruptedException { return dq.take(); } - public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { - return dq.poll(timeout, unit); - } - - public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); } - public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); } - public void put(Runnable x) { - dq.put((ScheduledFutureTask)x); - } - public boolean offer(Runnable x, long timeout, TimeUnit unit) { - return dq.offer((ScheduledFutureTask)x, timeout, unit); - } - - public Runnable remove() { return dq.remove(); } - public Runnable element() { return dq.element(); } - public void clear() { dq.clear(); } - public int drainTo(Collection c) { return dq.drainTo(c); } - public int drainTo(Collection c, int maxElements) { - return dq.drainTo(c, maxElements); - } - - public int remainingCapacity() { return dq.remainingCapacity(); } - public boolean remove(Object x) { return dq.remove(x); } - public boolean contains(Object x) { return dq.contains(x); } - public int size() { return dq.size(); } - public boolean isEmpty() { return dq.isEmpty(); } - public Object[] toArray() { return dq.toArray(); } - public T[] toArray(T[] array) { return dq.toArray(array); } - public Iterator iterator() { - return new Iterator() { - private Iterator it = dq.iterator(); - public boolean hasNext() { return it.hasNext(); } - public Runnable next() { return it.next(); } - public void remove() { it.remove(); } - }; + + /* + * A DelayedWorkQueue is based on a heap-based data structure + * like those in DelayQueue and PriorityQueue, except that + * every ScheduledFutureTask also records its index into the + * heap array. This eliminates the need to find a task upon + * cancellation, greatly speeding up removal (down from O(n) + * to O(log n)), and reducing garbage retention that would + * otherwise occur by waiting for the element to rise to top + * before clearing. But because the queue may also hold + * RunnableScheduledFutures that are not ScheduledFutureTasks, + * we are not guaranteed to have such indices available, in + * which case we fall back to linear search. (We expect that + * most tasks will not be decorated, and that the faster cases + * will be much more common.) + * + * All heap operations must record index changes -- mainly + * within siftUp and siftDown. Upon removal, a task's + * heapIndex is set to -1. Note that ScheduledFutureTasks can + * appear at most once in the queue (this need not be true for + * other kinds of tasks or work queues), so are uniquely + * identified by heapIndex. + */ + + private static final int INITIAL_CAPACITY = 16; + private ScheduledFutureTask[] queue = + new ScheduledFutureTask[INITIAL_CAPACITY]; + private final ReentrantLock lock = new ReentrantLock(); + private int size = 0; + + /** + * Thread designated to wait for the task at the head of the + * queue. This variant of the Leader-Follower pattern + * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to + * minimize unnecessary timed waiting. When a thread becomes + * the leader, it waits only for the next delay to elapse, but + * other threads await indefinitely. The leader thread must + * signal some other thread before returning from take() or + * poll(...), unless some other thread becomes leader in the + * interim. Whenever the head of the queue is replaced with a + * task with an earlier expiration time, the leader field is + * invalidated by being reset to null, and some waiting + * thread, but not necessarily the current leader, is + * signalled. So waiting threads must be prepared to acquire + * and lose leadership while waiting. + */ + private Thread leader = null; + + /** + * Condition signalled when a newer task becomes available at the + * head of the queue or a new thread may need to become leader. + */ + private final Condition available = lock.newCondition(); + + /** + * Set f's heapIndex if it is a ScheduledFutureTask. + */ + private void setIndex(ScheduledFutureTask f, int idx) { + if (f instanceof ScheduledFutureTask) + ((ScheduledFutureTask)f).heapIndex = idx; + } + + /** + * Sift element added at bottom up to its heap-ordered spot. + * Call only when holding lock. + */ + private void siftUp(int k, ScheduledFutureTask key) { + while (k > 0) { + int parent = (k - 1) >>> 1; + ScheduledFutureTask e = queue[parent]; + if (key.compareTo(e) >= 0) + break; + queue[k] = e; + setIndex(e, k); + k = parent; + } + queue[k] = key; + setIndex(key, k); + } + + /** + * Sift element added at top down to its heap-ordered spot. + * Call only when holding lock. + */ + private void siftDown(int k, ScheduledFutureTask key) { + int half = size >>> 1; + while (k < half) { + int child = (k << 1) + 1; + ScheduledFutureTask c = queue[child]; + int right = child + 1; + if (right < size && c.compareTo(queue[right]) > 0) + c = queue[child = right]; + if (key.compareTo(c) <= 0) + break; + queue[k] = c; + setIndex(c, k); + k = child; + } + queue[k] = key; + setIndex(key, k); + } + + /** + * Resize the heap array. Call only when holding lock. + */ + private void grow() { + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% + if (newCapacity < 0) // overflow + newCapacity = Integer.MAX_VALUE; + queue = Java6Arrays.copyOf(queue, newCapacity); + } + + /** + * Find index of given object, or -1 if absent + */ + private int indexOf(Object x) { + if (x != null) { + if (x instanceof ScheduledFutureTask) { + int i = ((ScheduledFutureTask) x).heapIndex; + // Sanity check; x could conceivably be a + // ScheduledFutureTask from some other pool. + if (i >= 0 && i < size && queue[i] == x) + return i; + } else { + for (int i = 0; i < size; i++) + if (x.equals(queue[i])) + return i; + } + } + return -1; + } + + public boolean contains(Object x) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return indexOf(x) != -1; + } finally { + lock.unlock(); + } + } + + public boolean remove(Object x) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int i = indexOf(x); + if (i < 0) + return false; + + setIndex(queue[i], -1); + int s = --size; + ScheduledFutureTask replacement = queue[s]; + queue[s] = null; + if (s != i) { + siftDown(i, replacement); + if (queue[i] == replacement) + siftUp(i, replacement); + } + return true; + } finally { + lock.unlock(); + } + } + + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return size; + } finally { + lock.unlock(); + } + } + + public boolean isEmpty() { + return size() == 0; + } + + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + public ScheduledFutureTask peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return queue[0]; + } finally { + lock.unlock(); + } + } + + public boolean offer(Runnable x) { + if (x == null) + throw new NullPointerException(); + ScheduledFutureTask e = (ScheduledFutureTask)x; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int i = size; + if (i >= queue.length) + grow(); + size = i + 1; + if (i == 0) { + queue[0] = e; + setIndex(e, 0); + } else { + siftUp(i, e); + } + if (queue[0] == e) { + leader = null; + available.signal(); + } + } finally { + lock.unlock(); + } + return true; + } + + public void put(Runnable e) { + offer(e); + } + + public boolean add(Runnable e) { + return offer(e); + } + + public boolean offer(Runnable e, long timeout, TimeUnit unit) { + return offer(e); + } + + /** + * Performs common bookkeeping for poll and take: Replaces + * first element with last and sifts it down. Call only when + * holding lock. + * @param f the task to remove and return + */ + private ScheduledFutureTask finishPoll(ScheduledFutureTask f) { + int s = --size; + ScheduledFutureTask x = queue[s]; + queue[s] = null; + if (s != 0) + siftDown(0, x); + setIndex(f, -1); + return f; + } + + public ScheduledFutureTask poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + ScheduledFutureTask first = queue[0]; + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) + return null; + else + return finishPoll(first); + } finally { + lock.unlock(); + } + } + + public ScheduledFutureTask take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + ScheduledFutureTask first = queue[0]; + if (first == null) + available.await(); + else { + long delay = first.getDelay(TimeUnit.NANOSECONDS); + if (delay <= 0) + return finishPoll(first); + else if (leader != null) + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && queue[0] != null) + available.signal(); + lock.unlock(); + } + } + + public ScheduledFutureTask poll(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + ScheduledFutureTask first = queue[0]; + if (first == null) { + if (nanos <= 0) + return null; + else + nanos = available.awaitNanos(nanos); + } else { + long delay = first.getDelay(TimeUnit.NANOSECONDS); + if (delay <= 0) + return finishPoll(first); + if (nanos <= 0) + return null; + if (nanos < delay || leader != null) + nanos = available.awaitNanos(nanos); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + long timeLeft = available.awaitNanos(delay); + nanos -= delay - timeLeft; + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && queue[0] != null) + available.signal(); + lock.unlock(); + } + } + + public void clear() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + for (int i = 0; i < size; i++) { + ScheduledFutureTask t = queue[i]; + if (t != null) { + queue[i] = null; + setIndex(t, -1); + } + } + size = 0; + } finally { + lock.unlock(); + } + } + + /** + * Return and remove first element only if it is expired. + * Used only by drainTo. Call only when holding lock. + */ + private ScheduledFutureTask pollExpired() { + ScheduledFutureTask first = queue[0]; + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) + return null; + return finishPoll(first); + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + ScheduledFutureTask first; + int n = 0; + while ((first = pollExpired()) != null) { + c.add(first); + ++n; + } + return n; + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + ScheduledFutureTask first; + int n = 0; + while (n < maxElements && (first = pollExpired()) != null) { + c.add(first); + ++n; + } + return n; + } finally { + lock.unlock(); + } + } + + public Object[] toArray() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return Java6Arrays.copyOf(queue, size, Object[].class); + } finally { + lock.unlock(); + } + } + + @SuppressWarnings("unchecked") + public T[] toArray(T[] a) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (a.length < size) + return (T[]) Java6Arrays.copyOf(queue, size, a.getClass()); + System.arraycopy(queue, 0, a, 0, size); + if (a.length > size) + a[size] = null; + return a; + } finally { + lock.unlock(); + } + } + + public Iterator iterator() { + return new Itr(Java6Arrays.copyOf(queue, size)); + } + + /** + * Snapshot iterator that works off copy of underlying q array. + */ + private class Itr implements Iterator { + final ScheduledFutureTask[] array; + int cursor = 0; // index of next element to return + int lastRet = -1; // index of last element, or -1 if no such + + Itr(ScheduledFutureTask[] array) { + this.array = array; + } + + public boolean hasNext() { + return cursor < array.length; + } + + public Runnable next() { + if (cursor >= array.length) + throw new NoSuchElementException(); + lastRet = cursor; + return array[cursor++]; + } + + public void remove() { + if (lastRet < 0) + throw new IllegalStateException(); + DelayedWorkQueue.this.remove(array[lastRet]); + lastRet = -1; + } } } }