harmony-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hinde...@apache.org
Subject svn commit: r800934 [4/6] - in /harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent: ./ locks/
Date Tue, 04 Aug 2009 19:39:25 GMT
Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java?rev=800934&r1=800933&r2=800934&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
(original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
Tue Aug  4 19:39:24 2009
@@ -4,10 +4,6 @@
  * http://creativecommons.org/licenses/publicdomain
  */
 
-/*
- * Modified in Apache Harmony.
- */
-
 package java.util.concurrent;
 import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
@@ -27,6 +23,15 @@
  * execution time are enabled in first-in-first-out (FIFO) order of
  * submission.
  *
+ * <p>When a submitted task is cancelled before it is run, execution
+ * is suppressed. By default, such a cancelled task is not
+ * automatically removed from the work queue until its delay
+ * elapses. While this enables further inspection and monitoring, it
+ * may also cause unbounded retention of cancelled tasks. To avoid
+ * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
+ * causes tasks to be immediately removed from the work queue at
+ * time of cancellation.
+ *
  * <p>Successive executions of a task scheduled via
  * <code>scheduleAtFixedRate</code> or
  * <code>scheduleWithFixedDelay</code> do not overlap. While different
@@ -37,9 +42,47 @@
  *
  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
  * of the inherited tuning methods are not useful for it. In
- * particular, because it acts as a fixed-sized pool using {@code
- * corePoolSize} threads and an unbounded queue, adjustments to {@code
- * maximumPoolSize} have no useful effect.
+ * particular, because it acts as a fixed-sized pool using
+ * {@code corePoolSize} threads and an unbounded queue, adjustments
+ * to {@code maximumPoolSize} have no useful effect. Additionally, it
+ * is almost never a good idea to set {@code corePoolSize} to zero or
+ * use {@code allowCoreThreadTimeOut} because this may leave the pool
+ * without threads to handle tasks once they become eligible to run.
+ *
+ * <p><b>Extension notes:</b> This class overrides the
+ * {@link ThreadPoolExecutor#execute execute} and
+ * {@link AbstractExecutorService#submit(Runnable) submit}
+ * methods to generate internal {@link ScheduledFuture} objects to
+ * control per-task delays and scheduling.  To preserve
+ * functionality, any further overrides of these methods in
+ * subclasses must invoke superclass versions, which effectively
+ * disables additional task customization.  However, this class
+ * provides alternative protected extension method
+ * {@code decorateTask} (one version each for {@code Runnable} and
+ * {@code Callable}) that can be used to customize the concrete task
+ * types used to execute commands entered via {@code execute},
+ * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
+ * and {@code scheduleWithFixedDelay}.  By default, a
+ * {@code ScheduledThreadPoolExecutor} uses a task type extending
+ * {@link FutureTask}. However, this may be modified or replaced using
+ * subclasses of the form:
+ *
+ *  <pre> {@code
+ * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
+ *
+ *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
+ *
+ *   protected <V> RunnableScheduledFuture<V> decorateTask(
+ *                Runnable r, RunnableScheduledFuture<V> task) {
+ *       return new CustomTask<V>(r, task);
+ *   }
+ *
+ *   protected <V> RunnableScheduledFuture<V> decorateTask(
+ *                Callable<V> c, RunnableScheduledFuture<V> task) {
+ *       return new CustomTask<V>(c, task);
+ *   }
+ *   // ... add constructors, etc.
+ * }}</pre>
  *
  * @since 1.5
  * @author Doug Lea
@@ -96,23 +139,15 @@
      */
     private static final AtomicLong sequencer = new AtomicLong(0);
 
-
-    /**
-     * Value of System.nanoTime upon static initialization.  This is
-     * used as an offset by now() to avoid wraparound of time values
-     * that would make them appear negative.
-     */
-    static final long initialNanoTime = System.nanoTime();
-
     /**
      * Returns current nanosecond time.
      */
-    static long now() {
-        return System.nanoTime() - initialNanoTime;
+    final long now() {
+        return System.nanoTime();
     }
 
     private class ScheduledFutureTask<V>
-            extends FutureTask<V> implements ScheduledFuture<V> {
+            extends FutureTask<V> implements RunnableScheduledFuture<V> {
 
         /** Sequence number to break ties FIFO */
         private final long sequenceNumber;
@@ -129,7 +164,7 @@
         private final long period;
 
         /** The actual task to be re-enqueued by reExecutePeriodic */
-        ScheduledFutureTask<V> outerTask = this;
+        RunnableScheduledFuture<V> outerTask = this;
 
         /**
          * Index into delay queue, to support faster cancellation.
@@ -167,8 +202,7 @@
         }
 
         public long getDelay(TimeUnit unit) {
-            long d = time - now();
-            return d<=0? 0 : unit.convert(d, TimeUnit.NANOSECONDS);
+            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
         }
 
         public int compareTo(Delayed other) {
@@ -208,7 +242,7 @@
             if (p > 0)
                 time += p;
             else
-                time = now() - p;
+                time = triggerTime(-p);
         }
 
         public boolean cancel(boolean mayInterruptIfRunning) {
@@ -257,7 +291,7 @@
      *
      * @param task the task
      */
-    private void delayedExecute(ScheduledFutureTask<?> task) {
+    private void delayedExecute(RunnableScheduledFuture<?> task) {
         if (isShutdown())
             reject(task);
         else {
@@ -277,7 +311,7 @@
      *
      * @param task the task
      */
-    void reExecutePeriodic(ScheduledFutureTask<?> task) {
+    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
         if (canRunInCurrentRunState(true)) {
             super.getQueue().add(task);
             if (!canRunInCurrentRunState(true) && remove(task))
@@ -302,9 +336,9 @@
         else {
             // Traverse snapshot to avoid iterator exceptions
             for (Object e : q.toArray()) {
-                if (e instanceof ScheduledFutureTask) {
-                    ScheduledFutureTask<?> t =
-                        (ScheduledFutureTask<?>)e;
+                if (e instanceof RunnableScheduledFuture) {
+                    RunnableScheduledFuture<?> t =
+                        (RunnableScheduledFuture<?>)e;
                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                         t.isCancelled()) { // also remove if already cancelled
                         if (q.remove(t))
@@ -317,11 +351,43 @@
     }
 
     /**
+     * Modifies or replaces the task used to execute a runnable.
+     * This method can be used to override the concrete
+     * class used for managing internal tasks.
+     * The default implementation simply returns the given task.
+     *
+     * @param runnable the submitted Runnable
+     * @param task the task created to execute the runnable
+     * @return a task that can execute the runnable
+     * @since 1.6
+     */
+    protected <V> RunnableScheduledFuture<V> decorateTask(
+        Runnable runnable, RunnableScheduledFuture<V> task) {
+        return task;
+    }
+
+    /**
+     * Modifies or replaces the task used to execute a callable.
+     * This method can be used to override the concrete
+     * class used for managing internal tasks.
+     * The default implementation simply returns the given task.
+     *
+     * @param callable the submitted Callable
+     * @param task the task created to execute the callable
+     * @return a task that can execute the callable
+     * @since 1.6
+     */
+    protected <V> RunnableScheduledFuture<V> decorateTask(
+        Callable<V> callable, RunnableScheduledFuture<V> task) {
+        return task;
+    }
+
+    /**
      * Creates a new {@code ScheduledThreadPoolExecutor} with the
      * given core pool size.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @throws IllegalArgumentException if {@code corePoolSize < 0}
      */
     public ScheduledThreadPoolExecutor(int corePoolSize) {
@@ -334,7 +400,7 @@
      * given initial parameters.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param threadFactory the factory to use when the executor
      *        creates a new thread
      * @throws IllegalArgumentException if {@code corePoolSize < 0}
@@ -351,7 +417,7 @@
      * initial parameters.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param handler the handler to use when execution is blocked
      *        because the thread bounds and queue capacities are reached
      * @throws IllegalArgumentException if {@code corePoolSize < 0}
@@ -368,7 +434,7 @@
      * initial parameters.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param threadFactory the factory to use when the executor
      *        creates a new thread
      * @param handler the handler to use when execution is blocked
@@ -385,17 +451,35 @@
     }
 
     /**
-     * Returns the trigger time of a delayed action
+     * Returns the trigger time of a delayed action.
      */
-    private static long nextTriggerTime(long delay, TimeUnit unit) {
-        long triggerTime;
-        long now = now();
-        if (delay <= 0) 
-            return now;            // avoid negative trigger times
-        else if ((triggerTime = now + unit.toNanos(delay)) < 0)
-            return Long.MAX_VALUE; // avoid numerical overflow
-        else
-            return triggerTime;
+    private long triggerTime(long delay, TimeUnit unit) {
+        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
+    }
+
+    /**
+     * Returns the trigger time of a delayed action.
+     */
+    long triggerTime(long delay) {
+        return now() +
+            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
+    }
+
+    /**
+     * Constrains the values of all delays in the queue to be within
+     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
+     * This may occur if a task is eligible to be dequeued, but has
+     * not yet been, while some other task is added with a delay of
+     * Long.MAX_VALUE.
+     */
+    private long overflowFree(long delay) {
+        Delayed head = (Delayed) super.getQueue().peek();
+        if (head != null) {
+            long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
+            if (headDelay < 0 && (delay - headDelay < 0))
+                delay = Long.MAX_VALUE + headDelay;
+        }
+        return delay;
     }
 
     /**
@@ -407,9 +491,9 @@
                                        TimeUnit unit) {
         if (command == null || unit == null)
             throw new NullPointerException();
-        long triggerTime = nextTriggerTime(delay, unit);
-        ScheduledFutureTask<?> t
-                = new ScheduledFutureTask<Void>(command, null, triggerTime);
+        RunnableScheduledFuture<?> t = decorateTask(command,
+            new ScheduledFutureTask<Void>(command, null,
+                                          triggerTime(delay, unit)));
         delayedExecute(t);
         return t;
     }
@@ -423,9 +507,9 @@
                                            TimeUnit unit) {
         if (callable == null || unit == null)
             throw new NullPointerException();
-        long triggerTime = nextTriggerTime(delay, unit);
-        ScheduledFutureTask<V> t
-                = new ScheduledFutureTask<V>(callable, triggerTime);
+        RunnableScheduledFuture<V> t = decorateTask(callable,
+            new ScheduledFutureTask<V>(callable,
+                                       triggerTime(delay, unit)));
         delayedExecute(t);
         return t;
     }
@@ -443,16 +527,15 @@
             throw new NullPointerException();
         if (period <= 0)
             throw new IllegalArgumentException();
-        if (initialDelay < 0) initialDelay = 0;
-        long triggerTime = nextTriggerTime(initialDelay, unit);
         ScheduledFutureTask<Void> sft =
             new ScheduledFutureTask<Void>(command,
                                           null,
-                                          triggerTime,
+                                          triggerTime(initialDelay, unit),
                                           unit.toNanos(period));
-        sft.outerTask = sft;
-        delayedExecute(sft);
-        return sft;
+        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
+        sft.outerTask = t;
+        delayedExecute(t);
+        return t;
     }
 
     /**
@@ -468,15 +551,15 @@
             throw new NullPointerException();
         if (delay <= 0)
             throw new IllegalArgumentException();
-        long triggerTime = nextTriggerTime(initialDelay, unit);
         ScheduledFutureTask<Void> sft =
             new ScheduledFutureTask<Void>(command,
                                           null,
-                                          triggerTime,
+                                          triggerTime(initialDelay, unit),
                                           unit.toNanos(-delay));
-        sft.outerTask = sft;
-        delayedExecute(sft);
-        return sft;
+        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
+        sft.outerTask = t;
+        delayedExecute(t);
+        return t;
     }
 
     /**
@@ -595,6 +678,31 @@
     }
 
     /**
+     * Sets the policy on whether cancelled tasks should be immediately
+     * removed from the work queue at time of cancellation.  This value is
+     * by default {@code false}.
+     *
+     * @param value if {@code true}, remove on cancellation, else don't
+     * @see #getRemoveOnCancelPolicy
+     */
+    void setRemoveOnCancelPolicy(boolean value) {
+        removeOnCancel = value;
+    }
+
+    /**
+     * Gets the policy on whether cancelled tasks should be immediately
+     * removed from the work queue at time of cancellation.  This value is
+     * by default {@code false}.
+     *
+     * @return {@code true} if cancelled tasks are immediately removed
+     *         from the queue
+     * @see #setRemoveOnCancelPolicy
+     */
+    boolean getRemoveOnCancelPolicy() {
+        return removeOnCancel;
+    }
+
+    /**
      * Initiates an orderly shutdown in which previously submitted
      * tasks are executed, but no new tasks will be accepted.
      * Invocation has no additional effect if already shut down.
@@ -688,8 +796,8 @@
          */
 
         private static final int INITIAL_CAPACITY = 16;
-        private ScheduledFutureTask[] queue =
-            new ScheduledFutureTask[INITIAL_CAPACITY];
+        private RunnableScheduledFuture[] queue =
+            new RunnableScheduledFuture[INITIAL_CAPACITY];
         private final ReentrantLock lock = new ReentrantLock();
         private int size = 0;
 
@@ -720,7 +828,7 @@
         /**
          * Set f's heapIndex if it is a ScheduledFutureTask.
          */
-        private void setIndex(ScheduledFutureTask f, int idx) {
+        private void setIndex(RunnableScheduledFuture f, int idx) {
             if (f instanceof ScheduledFutureTask)
                 ((ScheduledFutureTask)f).heapIndex = idx;
         }
@@ -729,10 +837,10 @@
          * Sift element added at bottom up to its heap-ordered spot.
          * Call only when holding lock.
          */
-        private void siftUp(int k, ScheduledFutureTask key) {
+        private void siftUp(int k, RunnableScheduledFuture key) {
             while (k > 0) {
                 int parent = (k - 1) >>> 1;
-                ScheduledFutureTask e = queue[parent];
+                RunnableScheduledFuture e = queue[parent];
                 if (key.compareTo(e) >= 0)
                     break;
                 queue[k] = e;
@@ -747,11 +855,11 @@
          * Sift element added at top down to its heap-ordered spot.
          * Call only when holding lock.
          */
-        private void siftDown(int k, ScheduledFutureTask key) {
+        private void siftDown(int k, RunnableScheduledFuture key) {
             int half = size >>> 1;
             while (k < half) {
                 int child = (k << 1) + 1;
-                ScheduledFutureTask c = queue[child];
+                RunnableScheduledFuture c = queue[child];
                 int right = child + 1;
                 if (right < size && c.compareTo(queue[right]) > 0)
                     c = queue[child = right];
@@ -816,7 +924,7 @@
 
                 setIndex(queue[i], -1);
                 int s = --size;
-                ScheduledFutureTask replacement = queue[s];
+                RunnableScheduledFuture replacement = queue[s];
                 queue[s] = null;
                 if (s != i) {
                     siftDown(i, replacement);
@@ -847,7 +955,7 @@
             return Integer.MAX_VALUE;
         }
 
-        public ScheduledFutureTask peek() {
+        public RunnableScheduledFuture peek() {
             final ReentrantLock lock = this.lock;
             lock.lock();
             try {
@@ -860,7 +968,7 @@
         public boolean offer(Runnable x) {
             if (x == null)
                 throw new NullPointerException();
-            ScheduledFutureTask e = (ScheduledFutureTask)x;
+            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
             final ReentrantLock lock = this.lock;
             lock.lock();
             try {
@@ -902,9 +1010,9 @@
          * holding lock.
          * @param f the task to remove and return
          */
-        private ScheduledFutureTask finishPoll(ScheduledFutureTask f) {
+        private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
             int s = --size;
-            ScheduledFutureTask x = queue[s];
+            RunnableScheduledFuture x = queue[s];
             queue[s] = null;
             if (s != 0)
                 siftDown(0, x);
@@ -912,11 +1020,11 @@
             return f;
         }
 
-        public ScheduledFutureTask poll() {
+        public RunnableScheduledFuture poll() {
             final ReentrantLock lock = this.lock;
             lock.lock();
             try {
-                ScheduledFutureTask first = queue[0];
+                RunnableScheduledFuture first = queue[0];
                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                     return null;
                 else
@@ -926,12 +1034,12 @@
             }
         }
 
-        public ScheduledFutureTask take() throws InterruptedException {
+        public RunnableScheduledFuture take() throws InterruptedException {
             final ReentrantLock lock = this.lock;
             lock.lockInterruptibly();
             try {
                 for (;;) {
-                    ScheduledFutureTask first = queue[0];
+                    RunnableScheduledFuture first = queue[0];
                     if (first == null)
                         available.await();
                     else {
@@ -959,14 +1067,14 @@
             }
         }
 
-        public ScheduledFutureTask poll(long timeout, TimeUnit unit)
+        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
             throws InterruptedException {
             long nanos = unit.toNanos(timeout);
             final ReentrantLock lock = this.lock;
             lock.lockInterruptibly();
             try {
                 for (;;) {
-                    ScheduledFutureTask first = queue[0];
+                    RunnableScheduledFuture first = queue[0];
                     if (first == null) {
                         if (nanos <= 0)
                             return null;
@@ -1005,7 +1113,7 @@
             lock.lock();
             try {
                 for (int i = 0; i < size; i++) {
-                    ScheduledFutureTask t = queue[i];
+                    RunnableScheduledFuture t = queue[i];
                     if (t != null) {
                         queue[i] = null;
                         setIndex(t, -1);
@@ -1021,8 +1129,8 @@
          * Return and remove first element only if it is expired.
          * Used only by drainTo.  Call only when holding lock.
          */
-        private ScheduledFutureTask pollExpired() {
-            ScheduledFutureTask first = queue[0];
+        private RunnableScheduledFuture pollExpired() {
+            RunnableScheduledFuture first = queue[0];
             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                 return null;
             return finishPoll(first);
@@ -1036,7 +1144,7 @@
             final ReentrantLock lock = this.lock;
             lock.lock();
             try {
-                ScheduledFutureTask first;
+                RunnableScheduledFuture first;
                 int n = 0;
                 while ((first = pollExpired()) != null) {
                     c.add(first);
@@ -1058,7 +1166,7 @@
             final ReentrantLock lock = this.lock;
             lock.lock();
             try {
-                ScheduledFutureTask first;
+                RunnableScheduledFuture first;
                 int n = 0;
                 while (n < maxElements && (first = pollExpired()) != null) {
                     c.add(first);
@@ -1104,11 +1212,11 @@
          * Snapshot iterator that works off copy of underlying q array.
          */
         private class Itr implements Iterator<Runnable> {
-            final ScheduledFutureTask[] array;
+            final RunnableScheduledFuture[] array;
             int cursor = 0;     // index of next element to return
             int lastRet = -1;   // index of last element, or -1 if no such
 
-            Itr(ScheduledFutureTask[] array) {
+            Itr(RunnableScheduledFuture[] array) {
                 this.array = array;
             }
 

Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java?rev=800934&r1=800933&r2=800934&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java
(original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/ThreadPoolExecutor.java
Tue Aug  4 19:39:24 2009
@@ -92,9 +92,12 @@
  * threads will be constructed. This parameter can also be changed
  * dynamically using method {@link #setKeepAliveTime}. Using a value
  * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively
- * disables idle threads from ever terminating prior to shut down. The
- * keep-alive policy applies only when there are more than
- * corePoolSizeThreads.</dd>
+ * disables idle threads from ever terminating prior to shut down. By
+ * default, the keep-alive policy applies only when there are more
+ * than corePoolSizeThreads. But method {@link
+ * #allowCoreThreadTimeOut(boolean)} can be used to apply this
+ * time-out policy to core threads as well, so long as the
+ * keepAliveTime value is non-zero. </dd>
  *
  * <dt>Queuing</dt>
  *
@@ -228,7 +231,8 @@
  * you would like to ensure that unreferenced pools are reclaimed even
  * if users forget to call {@link #shutdown}, then you must arrange
  * that unused threads eventually die, by setting appropriate
- * keep-alive times using a lower bound of zero core threads.  </dd>
+ * keep-alive times, using a lower bound of zero core threads and/or
+ * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
  *
  * </dl>
  *
@@ -473,13 +477,22 @@
     /**
      * Timeout in nanoseconds for idle threads waiting for work.
      * Threads use this timeout when there are more than corePoolSize
-     * present. Otherwise they wait forever for new work.
+     * present or if allowCoreThreadTimeOut. Otherwise they wait
+     * forever for new work.
      */
     private volatile long keepAliveTime;
 
     /**
+     * If false (default), core threads stay alive even when idle.
+     * If true, core threads use keepAliveTime to time out waiting
+     * for work.
+     */
+    private volatile boolean allowCoreThreadTimeOut;
+
+    /**
      * Core pool size is the minimum number of workers to keep alive
-     * (and not allow to time out etc).
+     * (and not allow to time out etc) unless allowCoreThreadTimeOut
+     * is set, in which case the minimum is zero.
      */
     private volatile int corePoolSize;
 
@@ -941,7 +954,7 @@
         int c = ctl.get();
         if (runStateLessThan(c, STOP)) {
             if (!completedAbruptly) {
-                int min = corePoolSize;
+                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                 if (min == 0 && ! workQueue.isEmpty())
                     min = 1;
                 if (workerCountOf(c) >= min)
@@ -961,7 +974,7 @@
      * 3. The pool is shutdown and the queue is empty.
      * 4. This worker timed out waiting for a task, and timed-out
      *    workers are subject to termination (that is,
-     *    {@code workerCount > corePoolSize})
+     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
      *    both before and after the timed wait.
      *
      * @return task, or null if the worker must exit, in which case
@@ -985,7 +998,7 @@
 
             for (;;) {
                 int wc = workerCountOf(c);
-                timed = wc > corePoolSize;
+                timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
                 if (wc <= maximumPoolSize && ! (timedOut && timed))
                     break;
@@ -1096,7 +1109,7 @@
      * methods instead of this general purpose constructor.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param maximumPoolSize the maximum number of threads to allow in the
      *        pool
      * @param keepAliveTime when the number of threads is greater than
@@ -1127,7 +1140,7 @@
      * parameters and default rejected execution handler.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param maximumPoolSize the maximum number of threads to allow in the
      *        pool
      * @param keepAliveTime when the number of threads is greater than
@@ -1162,7 +1175,7 @@
      * parameters and default thread factory.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param maximumPoolSize the maximum number of threads to allow in the
      *        pool
      * @param keepAliveTime when the number of threads is greater than
@@ -1197,7 +1210,7 @@
      * parameters.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
-     *        if they are idle
+     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param maximumPoolSize the maximum number of threads to allow in the
      *        pool
      * @param keepAliveTime when the number of threads is greater than
@@ -1518,6 +1531,50 @@
     }
 
     /**
+     * Returns true if this pool allows core threads to time out and
+     * terminate if no tasks arrive within the keepAlive time, being
+     * replaced if needed when new tasks arrive. When true, the same
+     * keep-alive policy applying to non-core threads applies also to
+     * core threads. When false (the default), core threads are never
+     * terminated due to lack of incoming tasks.
+     *
+     * @return {@code true} if core threads are allowed to time out,
+     *         else {@code false}
+     *
+     * @since 1.6
+     */
+    public boolean allowsCoreThreadTimeOut() {
+        return allowCoreThreadTimeOut;
+    }
+
+    /**
+     * Sets the policy governing whether core threads may time out and
+     * terminate if no tasks arrive within the keep-alive time, being
+     * replaced if needed when new tasks arrive. When false, core
+     * threads are never terminated due to lack of incoming
+     * tasks. When true, the same keep-alive policy applying to
+     * non-core threads applies also to core threads. To avoid
+     * continual thread replacement, the keep-alive time must be
+     * greater than zero when setting {@code true}. This method
+     * should in general be called before the pool is actively used.
+     *
+     * @param value {@code true} if should time out, else {@code false}
+     * @throws IllegalArgumentException if value is {@code true}
+     *         and the current keep-alive time is not greater than zero
+     *
+     * @since 1.6
+     */
+    public void allowCoreThreadTimeOut(boolean value) {
+        if (value && keepAliveTime <= 0)
+            throw new IllegalArgumentException("Core threads must have nonzero keep alive
times");
+        if (value != allowCoreThreadTimeOut) {
+            allowCoreThreadTimeOut = value;
+            if (value)
+                interruptIdleWorkers();
+        }
+    }
+
+    /**
      * Sets the maximum allowed number of threads. This overrides any
      * value set in the constructor. If the new value is smaller than
      * the current value, excess existing threads will be
@@ -1564,6 +1621,8 @@
     public void setKeepAliveTime(long time, TimeUnit unit) {
         if (time < 0)
             throw new IllegalArgumentException();
+        if (time == 0 && allowsCoreThreadTimeOut())
+            throw new IllegalArgumentException("Core threads must have nonzero keep alive
times");
         long keepAliveTime = unit.toNanos(time);
         long delta = keepAliveTime - this.keepAliveTime;
         this.keepAliveTime = keepAliveTime;



Mime
View raw message