Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 32767 invoked from network); 2 Mar 2009 14:24:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 2 Mar 2009 14:24:08 -0000 Received: (qmail 32752 invoked by uid 500); 2 Mar 2009 14:24:08 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 32737 invoked by uid 500); 2 Mar 2009 14:24:08 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Delivered-To: moderator for cassandra-commits@incubator.apache.org Received: (qmail 36087 invoked by uid 99); 2 Mar 2009 07:58:35 -0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r749218 [6/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache/... Date: Mon, 02 Mar 2009 07:57:31 -0000 To: cassandra-commits@incubator.apache.org From: pmalik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090302075743.F2BEA2388D93@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,2244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.cassandra.concurrent; + +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.*; +import java.util.*; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.Cachetable; +import org.apache.cassandra.utils.GuidGenerator; +import org.apache.cassandra.utils.ICachetable; +import org.apache.cassandra.utils.LogUtil; +import org.apache.commons.javaflow.Continuation; +import org.apache.log4j.Logger; + + +/** + * An {@link ExecutorService} that executes each submitted task using one of + * possibly several pooled threads, normally configured using {@link Executors} + * factory methods. + * + *

+ * Thread pools address two different problems: they usually provide improved + * performance when executing large numbers of asynchronous tasks, due to + * reduced per-task invocation overhead, and they provide a means of bounding + * and managing the resources, including threads, consumed when executing a + * collection of tasks. Each {@code ContinuationsExecutor} also maintains some + * basic statistics, such as the number of completed tasks. + * + *

+ * To be useful across a wide range of contexts, this class provides many + * adjustable parameters and extensibility hooks. However, programmers are urged + * to use the more convenient {@link Executors} factory methods {@link + * Executors#newCachedThreadPool} (unbounded thread pool, with automatic thread + * reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) + * and {@link Executors#newSingleThreadExecutor} (single background thread), + * that preconfigure settings for the most common usage scenarios. Otherwise, + * use the following guide when manually configuring and tuning this class: + * + *

+ * + *
Core and maximum pool sizes
+ * + *
A {@code ContinuationsExecutor} will automatically adjust the pool size + * (see {@link #getPoolSize}) according to the bounds set by corePoolSize (see + * {@link #getCorePoolSize}) and maximumPoolSize (see + * {@link #getMaximumPoolSize}). + * + * When a new task is submitted in method {@link #execute}, and fewer than + * corePoolSize threads are running, a new thread is created to handle the + * request, even if other worker threads are idle. If there are more than + * corePoolSize but less than maximumPoolSize threads running, a new thread will + * be created only if the queue is full. By setting corePoolSize and + * maximumPoolSize the same, you create a fixed-size thread pool. By setting + * maximumPoolSize to an essentially unbounded value such as + * {@code Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary + * number of concurrent tasks. Most typically, core and maximum pool sizes are + * set only upon construction, but they may also be changed dynamically using + * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}.
+ * + *
On-demand construction
+ * + *
By default, even core threads are initially created and started only + * when new tasks arrive, but this can be overridden dynamically using method + * {@link #prestartCoreThread} or {@link #prestartAllCoreThreads}. You probably + * want to prestart threads if you construct the pool with a non-empty queue. + *
+ * + *
Creating new threads
+ * + *
New threads are created using a {@link ThreadFactory}. If not otherwise + * specified, a {@link Executors#defaultThreadFactory} is used, that creates + * threads to all be in the same {@link ThreadGroup} and with the same + * {@code NORM_PRIORITY} priority and non-daemon status. By supplying a + * different ThreadFactory, you can alter the thread's name, thread group, + * priority, daemon status, etc. If a {@code ThreadFactory} fails to create a + * thread when asked by returning null from {@code newThread}, the executor + * will continue, but might not be able to execute any tasks. Threads should + * possess the "modifyThread" {@code RuntimePermission}. If worker threads or + * other threads using the pool do not possess this permission, service may be + * degraded: configuration changes may not take effect in a timely manner, and a + * shutdown pool may remain in a state in which termination is possible but not + * completed.
+ * + *
Keep-alive times
+ * + *
If the pool currently has more than corePoolSize threads, excess threads + * will be terminated if they have been idle for more than the keepAliveTime + * (see {@link #getKeepAliveTime}). This provides a means of reducing resource + * consumption when the pool is not being actively used. If the pool becomes + * more active later, new threads will be constructed. This parameter can also + * be changed dynamically using method {@link #setKeepAliveTime}. Using a value + * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively disables + * idle threads from ever terminating prior to shut down. By default, the + * keep-alive policy applies only when there are more than corePoolSizeThreads. + * But method {@link #allowCoreThreadTimeOut(boolean)} can be used to apply this + * time-out policy to core threads as well, so long as the keepAliveTime value + * is non-zero.
+ * + *
Queuing
+ * + *
Any {@link BlockingQueue} may be used to transfer and hold submitted + * tasks. The use of this queue interacts with pool sizing: + * + *
    + * + *
  • If fewer than corePoolSize threads are running, the Executor always + * prefers adding a new thread rather than queuing.
  • + * + *
  • If corePoolSize or more threads are running, the Executor always prefers + * queuing a request rather than adding a new thread.
  • + * + *
  • If a request cannot be queued, a new thread is created unless this would + * exceed maximumPoolSize, in which case, the task will be rejected.
  • + * + *
+ * + * There are three general strategies for queuing: + *
    + * + *
  1. Direct handoffs. A good default choice for a work queue is a + * {@link SynchronousQueue} that hands off tasks to threads without otherwise + * holding them. Here, an attempt to queue a task will fail if no threads are + * immediately available to run it, so a new thread will be constructed. This + * policy avoids lockups when handling sets of requests that might have internal + * dependencies. Direct handoffs generally require unbounded maximumPoolSizes to + * avoid rejection of new submitted tasks. This in turn admits the possibility + * of unbounded thread growth when commands continue to arrive on average faster + * than they can be processed.
  2. + * + *
  3. Unbounded queues. Using an unbounded queue (for example a + * {@link LinkedBlockingQueue} without a predefined capacity) will cause new + * tasks to wait in the queue when all corePoolSize threads are busy. Thus, no + * more than corePoolSize threads will ever be created. (And the value of the + * maximumPoolSize therefore doesn't have any effect.) This may be appropriate + * when each task is completely independent of others, so tasks cannot affect + * each others execution; for example, in a web page server. While this style of + * queuing can be useful in smoothing out transient bursts of requests, it + * admits the possibility of unbounded work queue growth when commands continue + * to arrive on average faster than they can be processed.
  4. + * + *
  5. Bounded queues. A bounded queue (for example, an + * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when used with + * finite maximumPoolSizes, but can be more difficult to tune and control. Queue + * sizes and maximum pool sizes may be traded off for each other: Using large + * queues and small pools minimizes CPU usage, OS resources, and + * context-switching overhead, but can lead to artificially low throughput. If + * tasks frequently block (for example if they are I/O bound), a system may be + * able to schedule time for more threads than you otherwise allow. Use of small + * queues generally requires larger pool sizes, which keeps CPUs busier but may + * encounter unacceptable scheduling overhead, which also decreases throughput. + *
  6. + * + *
+ * + *
+ * + *
Rejected tasks
+ * + *
New tasks submitted in method {@link #execute} will be rejected + * when the Executor has been shut down, and also when the Executor uses finite + * bounds for both maximum threads and work queue capacity, and is saturated. In + * either case, the {@code execute} method invokes the {@link + * RejectedExecutionHandler#rejectedExecution} method of its {@link + * RejectedExecutionHandler}. Four predefined handler policies are provided: + * + *
    + * + *
  1. In the default {@link ContinuationsExecutor.AbortPolicy}, the handler + * throws a runtime {@link RejectedExecutionException} upon rejection.
  2. + * + *
  3. In {@link ContinuationsExecutor.CallerRunsPolicy}, the thread that invokes + * {@code execute} itself runs the task. This provides a simple feedback control + * mechanism that will slow down the rate that new tasks are submitted.
  4. + * + *
  5. In {@link ContinuationsExecutor.DiscardPolicy}, a task that cannot be + * executed is simply dropped.
  6. + * + *
  7. In {@link ContinuationsExecutor.DiscardOldestPolicy}, if the executor is + * not shut down, the task at the head of the work queue is dropped, and then + * execution is retried (which can fail again, causing this to be repeated.) + *
  8. + * + *
+ * + * It is possible to define and use other kinds of {@link + * RejectedExecutionHandler} classes. Doing so requires some care especially + * when policies are designed to work only under particular capacity or queuing + * policies.
+ * + *
Hook methods
+ * + *
This class provides {@code protected} overridable {@link #beforeExecute} + * and {@link #afterExecute} methods that are called before and after execution + * of each task. These can be used to manipulate the execution environment; for + * example, reinitializing ThreadLocals, gathering statistics, or adding log + * entries. Additionally, method {@link #terminated} can be overridden to + * perform any special processing that needs to be done once the Executor has + * fully terminated. + * + *

+ * If hook or callback methods throw exceptions, internal worker threads may in + * turn fail and abruptly terminate.

+ * + *
Queue maintenance
+ * + *
Method {@link #getQueue} allows access to the work queue for purposes of + * monitoring and debugging. Use of this method for any other purpose is + * strongly discouraged. Two supplied methods, {@link #remove} and + * {@link #purge} are available to assist in storage reclamation when large + * numbers of queued tasks become cancelled.
+ * + *
Finalization
+ * + *
A pool that is no longer referenced in a program AND has no + * remaining threads will be {@code shutdown} automatically. If you would like + * to ensure that unreferenced pools are reclaimed even if users forget to call + * {@link #shutdown}, then you must arrange that unused threads eventually die, + * by setting appropriate keep-alive times, using a lower bound of zero core + * threads and/or setting {@link #allowCoreThreadTimeOut(boolean)}.
+ * + *
+ * + *

+ * Extension example. Most extensions of this class override one or more + * of the protected hook methods. For example, here is a subclass that adds a + * simple pause/resume feature: + * + *

+ *  {@code
+ *  class PausableThreadPoolExecutor extends ContinuationsExecutor {
+ *    private boolean isPaused;
+ *    private ReentrantLock pauseLock = new ReentrantLock();
+ *    private Condition unpaused = pauseLock.newCondition();
+ * 
+ *    public PausableThreadPoolExecutor(...) { super(...); }
+ * 
+ *    protected void beforeExecute(Thread t, Runnable r) {
+ *      super.beforeExecute(t, r);
+ *      pauseLock.lock();
+ *      try {
+ *        while (isPaused) unpaused.await();
+ *      } catch (InterruptedException ie) {
+ *        t.interrupt();
+ *      } finally {
+ *        pauseLock.unlock();
+ *      }
+ *    }
+ * 
+ *    public void pause() {
+ *      pauseLock.lock();
+ *      try {
+ *        isPaused = true;
+ *      } finally {
+ *        pauseLock.unlock();
+ *      }
+ *    }
+ * 
+ *    public void resume() {
+ *      pauseLock.lock();
+ *      try {
+ *        isPaused = false;
+ *        unpaused.signalAll();
+ *      } finally {
+ *        pauseLock.unlock();
+ *      }
+ *    }
+ *  }}
+ * 
+ * + * @since 1.5 + * @author Doug Lea + */ +public class ContinuationsExecutor extends AbstractExecutorService +{ + /** + * The main pool control state, ctl, is an atomic integer packing two + * conceptual fields workerCount, indicating the effective number of threads + * runState, indicating whether running, shutting down etc + * + * In order to pack them into one int, we limit workerCount to (2^29)-1 + * (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise + * representable. If this is ever an issue in the future, the variable can + * be changed to be an AtomicLong, and the shift/mask constants below + * adjusted. But until the need arises, this code is a bit faster and + * simpler using an int. + * + * The workerCount is the number of workers that have been permitted to + * start and not permitted to stop. The value may be transiently different + * from the actual number of live threads, for example when a ThreadFactory + * fails to create a thread when asked, and when exiting threads are still + * performing bookkeeping before terminating. The user-visible pool size is + * reported as the current size of the workers set. + * + * The runState provides the main lifecyle control, taking on values: + * + * RUNNING: Accept new tasks and process queued tasks SHUTDOWN: Don't accept + * new tasks, but process queued tasks STOP: Don't accept new tasks, don't + * process queued tasks, and interrupt in-progress tasks TIDYING: All tasks + * have terminated, workerCount is zero, the thread transitioning to state + * TIDYING will run the terminated() hook method TERMINATED: terminated() + * has completed + * + * The numerical order among these values matters, to allow ordered + * comparisons. The runState monotonically increases over time, but need not + * hit each state. The transitions are: + * + * RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in + * finalize() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow() + * SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING + * When pool is empty TIDYING -> TERMINATED When the terminated() hook + * method has completed + * + * Threads waiting in awaitTermination() will return when the state reaches + * TERMINATED. + * + * Detecting the transition from SHUTDOWN to TIDYING is less straightforward + * than you'd like because the queue may become empty after non-empty and + * vice versa during SHUTDOWN state, but we can only terminate if, after + * seeing that it is empty, we see that workerCount is 0 (which sometimes + * entails a recheck -- see below). + */ + private final static Logger logger_ = Logger.getLogger( ContinuationsExecutor.class ); + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); + private final static ThreadLocal tls_ = new ThreadLocal(); + private static final int COUNT_BITS = Integer.SIZE - 3; + private static final int CAPACITY = (1 << COUNT_BITS) - 1; + + // runState is stored in the high-order bits + private static final int RUNNING = -1 << COUNT_BITS; + private static final int SHUTDOWN = 0 << COUNT_BITS; + private static final int STOP = 1 << COUNT_BITS; + private static final int TIDYING = 2 << COUNT_BITS; + private static final int TERMINATED = 3 << COUNT_BITS; + + public static void putInTls(IContinuable run) + { + tls_.set(run); + } + + public static void doPostProcessing(Continuation c) + { + /* post process the call if need be */ + IContinuable run = ContinuationsExecutor.tls_.get(); + if ( run != null ) + { + run.run(c); + } + ContinuationsExecutor.tls_.remove(); + } + + // Packing and unpacking ctl + private static int runStateOf(int c) + { + return c & ~CAPACITY; + } + + private static int workerCountOf(int c) + { + return c & CAPACITY; + } + + private static int ctlOf(int rs, int wc) + { + return rs | wc; + } + + /* + * Bit field accessors that don't require unpacking ctl. These depend on the + * bit layout and on workerCount being never negative. + */ + + private static boolean runStateLessThan(int c, int s) + { + return c < s; + } + + private static boolean runStateAtLeast(int c, int s) + { + return c >= s; + } + + private static boolean isRunning(int c) + { + return c < SHUTDOWN; + } + + /** + * Attempt to CAS-increment the workerCount field of ctl. + */ + private boolean compareAndIncrementWorkerCount(int expect) + { + return ctl.compareAndSet(expect, expect + 1); + } + + /** + * Attempt to CAS-decrement the workerCount field of ctl. + */ + private boolean compareAndDecrementWorkerCount(int expect) + { + return ctl.compareAndSet(expect, expect - 1); + } + + /** + * Decrements the workerCount field of ctl. This is called only on abrupt + * termination of a thread (see processWorkerExit). Other decrements are + * performed within getTask. + */ + private void decrementWorkerCount() + { + do + { + } + while (!compareAndDecrementWorkerCount(ctl.get())); + } + + /** + * The queue used for holding tasks and handing off to worker threads. We do + * not require that workQueue.poll() returning null necessarily means that + * workQueue.isEmpty(), so rely solely on isEmpty to see if the queue is + * empty (which we must do for example when deciding whether to transition + * from SHUTDOWN to TIDYING). This accommodates special-purpose queues such + * as DelayQueues for which poll() is allowed to return null even if it may + * later return non-null when delays expire. + */ + private final BlockingQueue workQueue; + + /** + * Lock held on access to workers set and related bookkeeping. While we + * could use a concurrent set of some sort, it turns out to be generally + * preferable to use a lock. Among the reasons is that this serializes + * interruptIdleWorkers, which avoids unnecessary interrupt storms, + * especially during shutdown. Otherwise exiting threads would concurrently + * interrupt those that have not yet interrupted. It also simplifies some of + * the associated statistics bookkeeping of largestPoolSize etc. We also + * hold mainLock on shutdown and shutdownNow, for the sake of ensuring + * workers set is stable while separately checking permission to interrupt + * and actually interrupting. + */ + private final ReentrantLock mainLock = new ReentrantLock(); + + /** + * Set containing all worker threads in pool. Accessed only when holding + * mainLock. + */ + private final HashSet workers = new HashSet(); + + /** + * Wait condition to support awaitTermination + */ + private final Condition termination = mainLock.newCondition(); + + /** + * Tracks largest attained pool size. Accessed only under mainLock. + */ + private int largestPoolSize; + + /** + * Counter for completed tasks. Updated only on termination of worker + * threads. Accessed only under mainLock. + */ + private long completedTaskCount; + + /* + * All user control parameters are declared as volatiles so that ongoing + * actions are based on freshest values, but without need for locking, since + * no internal invariants depend on them changing synchronously with respect + * to other actions. + */ + + /** + * Factory for new threads. All threads are created using this factory (via + * method addWorker). All callers must be prepared for addWorker to fail, + * which may reflect a system or user's policy limiting the number of + * threads. Even though it is not treated as an error, failure to create + * threads may result in new tasks being rejected or existing ones remaining + * stuck in the queue. On the other hand, no special precautions exist to + * handle OutOfMemoryErrors that might be thrown while trying to create + * threads, since there is generally no recourse from within this class. + */ + private volatile ThreadFactory threadFactory; + + /** + * Handler called when saturated or shutdown in execute. + */ + private volatile RejectedExecutionHandler handler; + + /** + * Timeout in nanoseconds for idle threads waiting for work. Threads use + * this timeout when there are more than corePoolSize present or if + * allowCoreThreadTimeOut. Otherwise they wait forever for new work. + */ + private volatile long keepAliveTime; + + /** + * If false (default), core threads stay alive even when idle. If true, core + * threads use keepAliveTime to time out waiting for work. + */ + private volatile boolean allowCoreThreadTimeOut; + + /** + * Core pool size is the minimum number of workers to keep alive (and not + * allow to time out etc) unless allowCoreThreadTimeOut is set, in which + * case the minimum is zero. + */ + private volatile int corePoolSize; + + /** + * Maximum pool size. Note that the actual maximum is internally bounded by + * CAPACITY. + */ + private volatile int maximumPoolSize; + + /** + * The default rejected execution handler + */ + private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); + + /** + * Permission required for callers of shutdown and shutdownNow. We + * additionally require (see checkShutdownAccess) that callers have + * permission to actually interrupt threads in the worker set (as governed + * by Thread.interrupt, which relies on ThreadGroup.checkAccess, which in + * turn relies on SecurityManager.checkAccess). Shutdowns are attempted only + * if these checks pass. + * + * All actual invocations of Thread.interrupt (see interruptIdleWorkers and + * interruptWorkers) ignore SecurityExceptions, meaning that the attempted + * interrupts silently fail. In the case of shutdown, they should not fail + * unless the SecurityManager has inconsistent policies, sometimes allowing + * access to a thread and sometimes not. In such cases, failure to actually + * interrupt threads may disable or delay full termination. Other uses of + * interruptIdleWorkers are advisory, and failure to actually interrupt will + * merely delay response to configuration changes so is not handled + * exceptionally. + */ + private static final RuntimePermission shutdownPerm = new RuntimePermission( + "modifyThread"); + + /** + * Class Worker mainly maintains interrupt control state for threads running + * tasks, along with other minor bookkeeping. This class opportunistically + * extends AbstractQueuedSynchronizer to simplify acquiring and releasing a + * lock surrounding each task execution. This protects against interrupts + * that are intended to wake up a worker thread waiting for a task from + * instead interrupting a task being run. We implement a simple + * non-reentrant mutual exclusion lock rather than use ReentrantLock because + * we do not want worker tasks to be able to reacquire the lock when they + * invoke pool control methods like setCorePoolSize. + */ + private final class Worker extends AbstractQueuedSynchronizer implements Runnable + { + /** + * This class will never be serialized, but we provide a + * serialVersionUID to suppress a javac warning. + */ + private static final long serialVersionUID = 6138294804551838833L; + + /** Thread this worker is running in. Null if factory fails. */ + final Thread thread; + /** Initial task to run. Possibly null. */ + Runnable firstTask; + /** Per-thread task counter */ + volatile long completedTasks; + + /** + * Creates with given first task and thread from ThreadFactory. + * + * @param firstTask + * the first task (null if none) + */ + Worker(Runnable firstTask) + { + this.firstTask = firstTask; + this.thread = getThreadFactory().newThread(this); + } + + /** Delegates main run loop to outer runWorker */ + public void run() + { + runWorker(this); + } + + // Lock methods + // + // The value 0 represents the unlocked state. + // The value 1 represents the locked state. + + protected boolean isHeldExclusively() + { + return getState() == 1; + } + + protected boolean tryAcquire(int unused) + { + if (compareAndSetState(0, 1)) + { + setExclusiveOwnerThread(Thread.currentThread()); + return true; + } + return false; + } + + protected boolean tryRelease(int unused) + { + setExclusiveOwnerThread(null); + setState(0); + return true; + } + + public void lock() + { + acquire(1); + } + + public boolean tryLock() + { + return tryAcquire(1); + } + + public void unlock() + { + release(1); + } + + public boolean isLocked() + { + return isHeldExclusively(); + } + } + + /* + * Methods for setting control state + */ + + /** + * Transitions runState to given target, or leaves it alone if already at + * least the given target. + * + * @param targetState + * the desired state, either SHUTDOWN or STOP (but not TIDYING or + * TERMINATED -- use tryTerminate for that) + */ + private void advanceRunState(int targetState) + { + for (;;) + { + int c = ctl.get(); + if (runStateAtLeast(c, targetState) + || ctl.compareAndSet(c, + ctlOf(targetState, workerCountOf(c)))) + break; + } + } + + /** + * Transitions to TERMINATED state if either (SHUTDOWN and pool and queue + * empty) or (STOP and pool empty). If otherwise eligible to terminate but + * workerCount is nonzero, interrupts an idle worker to ensure that shutdown + * signals propagate. This method must be called following any action that + * might make termination possible -- reducing worker count or removing + * tasks from the queue during shutdown. The method is non-private to allow + * access from ScheduledThreadPoolExecutor. + */ + final void tryTerminate() + { + for (;;) + { + int c = ctl.get(); + if (isRunning(c) || runStateAtLeast(c, TIDYING) + || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) + return; + if (workerCountOf(c) != 0) + { // Eligible to terminate + interruptIdleWorkers(ONLY_ONE); + return; + } + + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) + { + try + { + terminated(); + } + finally + { + ctl.set(ctlOf(TERMINATED, 0)); + termination.signalAll(); + } + return; + } + } + finally + { + mainLock.unlock(); + } + // else retry on failed CAS + } + } + + /* + * Methods for controlling interrupts to worker threads. + */ + + /** + * If there is a security manager, makes sure caller has permission to shut + * down threads in general (see shutdownPerm). If this passes, additionally + * makes sure the caller is allowed to interrupt each worker thread. This + * might not be true even if first check passed, if the SecurityManager + * treats some threads specially. + */ + private void checkShutdownAccess() + { + SecurityManager security = System.getSecurityManager(); + if (security != null) + { + security.checkPermission(shutdownPerm); + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + for (Worker w : workers) + security.checkAccess(w.thread); + } + finally + { + mainLock.unlock(); + } + } + } + + /** + * Interrupts all threads, even if active. Ignores SecurityExceptions (in + * which case some threads may remain uninterrupted). + */ + private void interruptWorkers() + { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + for (Worker w : workers) + { + try + { + w.thread.interrupt(); + } + catch (SecurityException ignore) + { + } + } + } + finally + { + mainLock.unlock(); + } + } + + /** + * Interrupts threads that might be waiting for tasks (as indicated by not + * being locked) so they can check for termination or configuration changes. + * Ignores SecurityExceptions (in which case some threads may remain + * uninterrupted). + * + * @param onlyOne + * If true, interrupt at most one worker. This is called only + * from tryTerminate when termination is otherwise enabled but + * there are still other workers. In this case, at most one + * waiting worker is interrupted to propagate shutdown signals in + * case all threads are currently waiting. Interrupting any + * arbitrary thread ensures that newly arriving workers since + * shutdown began will also eventually exit. To guarantee + * eventual termination, it suffices to always interrupt only one + * idle worker, but shutdown() interrupts all idle workers so + * that redundant workers exit promptly, not waiting for a + * straggler task to finish. + */ + private void interruptIdleWorkers(boolean onlyOne) + { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + for (Worker w : workers) + { + Thread t = w.thread; + if (!t.isInterrupted() && w.tryLock()) + { + try + { + t.interrupt(); + } + catch (SecurityException ignore) + { + } + finally + { + w.unlock(); + } + } + if (onlyOne) + break; + } + } + finally + { + mainLock.unlock(); + } + } + + /** + * Common form of interruptIdleWorkers, to avoid having to remember what the + * boolean argument means. + */ + private void interruptIdleWorkers() + { + interruptIdleWorkers(false); + } + + private static final boolean ONLY_ONE = true; + + /** + * Ensures that unless the pool is stopping, the current thread does not + * have its interrupt set. This requires a double-check of state in case the + * interrupt was cleared concurrently with a shutdownNow -- if so, the + * interrupt is re-enabled. + */ + private void clearInterruptsForTaskRun() + { + if (runStateLessThan(ctl.get(), STOP) && Thread.interrupted() + && runStateAtLeast(ctl.get(), STOP)) + Thread.currentThread().interrupt(); + } + + /* + * Misc utilities, most of which are also exported to + * ScheduledThreadPoolExecutor + */ + + /** + * Invokes the rejected execution handler for the given command. + * Package-protected for use by ScheduledThreadPoolExecutor. + */ + final void reject(Runnable command) + { + handler.rejectedExecution(command, this); + } + + /** + * Performs any further cleanup following run state transition on invocation + * of shutdown. A no-op here, but used by ScheduledThreadPoolExecutor to + * cancel delayed tasks. + */ + void onShutdown() + { + } + + /** + * State check needed by ScheduledThreadPoolExecutor to enable running tasks + * during shutdown. + * + * @param shutdownOK + * true if should return true if SHUTDOWN + */ + final boolean isRunningOrShutdown(boolean shutdownOK) + { + int rs = runStateOf(ctl.get()); + return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); + } + + /** + * Drains the task queue into a new list, normally using drainTo. But if the + * queue is a DelayQueue or any other kind of queue for which poll or + * drainTo may fail to remove some elements, it deletes them one by one. + */ + private List drainQueue() + { + BlockingQueue q = workQueue; + List taskList = new ArrayList(); + q.drainTo(taskList); + if (!q.isEmpty()) + { + for (Runnable r : q.toArray(new Runnable[0])) + { + if (q.remove(r)) + taskList.add(r); + } + } + return taskList; + } + + /* + * Methods for creating, running and cleaning up after workers + */ + + /** + * Checks if a new worker can be added with respect to current pool state + * and the given bound (either core or maximum). If so, the worker count is + * adjusted accordingly, and, if possible, a new worker is created and + * started running firstTask as its first task. This method returns false if + * the pool is stopped or eligible to shut down. It also returns false if + * the thread factory fails to create a thread when asked, which requires a + * backout of workerCount, and a recheck for termination, in case the + * existence of this worker was holding up termination. + * + * @param firstTask + * the task the new thread should run first (or null if none). + * Workers are created with an initial first task (in method + * execute()) to bypass queuing when there are fewer than + * corePoolSize threads (in which case we always start one), or + * when the queue is full (in which case we must bypass queue). + * Initially idle threads are usually created via + * prestartCoreThread or to replace other dying workers. + * + * @param core + * if true use corePoolSize as bound, else maximumPoolSize. (A + * boolean indicator is used here rather than a value to ensure + * reads of fresh values after checking other pool state). + * @return true if successful + */ + private boolean addWorker(Runnable firstTask, boolean core) + { + retry: for (;;) + { + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + if (rs >= SHUTDOWN + && !(rs == SHUTDOWN && firstTask == null && !workQueue + .isEmpty())) + return false; + + for (;;) + { + int wc = workerCountOf(c); + if (wc >= CAPACITY + || wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + if (compareAndIncrementWorkerCount(c)) + break retry; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + + Worker w = new Worker(firstTask); + Thread t = w.thread; + + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + // Recheck while holding lock. + // Back out on ThreadFactory failure or if + // shut down before lock acquired. + int c = ctl.get(); + int rs = runStateOf(c); + + if (t == null + || (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null))) + { + decrementWorkerCount(); + tryTerminate(); + return false; + } + + workers.add(w); + + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + } + finally + { + mainLock.unlock(); + } + + t.start(); + // It is possible (but unlikely) for a thread to have been + // added to workers, but not yet started, during transition to + // STOP, which could result in a rare missed interrupt, + // because Thread.interrupt is not guaranteed to have any effect + // on a non-yet-started Thread (see Thread#interrupt). + if (runStateOf(ctl.get()) == STOP && !t.isInterrupted()) + t.interrupt(); + + return true; + } + + /** + * Performs cleanup and bookkeeping for a dying worker. Called only from + * worker threads. Unless completedAbruptly is set, assumes that workerCount + * has already been adjusted to account for exit. This method removes thread + * from worker set, and possibly terminates the pool or replaces the worker + * if either it exited due to user task exception or if fewer than + * corePoolSize workers are running or queue is non-empty but there are no + * workers. + * + * @param w + * the worker + * @param completedAbruptly + * if the worker died due to user exception + */ + private void processWorkerExit(Worker w, boolean completedAbruptly) + { + if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted + decrementWorkerCount(); + + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + completedTaskCount += w.completedTasks; + workers.remove(w); + } + finally + { + mainLock.unlock(); + } + + tryTerminate(); + + int c = ctl.get(); + if (runStateLessThan(c, STOP)) + { + if (!completedAbruptly) + { + int min = allowCoreThreadTimeOut ? 0 : corePoolSize; + if (min == 0 && !workQueue.isEmpty()) + min = 1; + if (workerCountOf(c) >= min) + return; // replacement not needed + } + addWorker(null, false); + } + } + + /** + * Performs blocking or timed wait for a task, depending on current + * configuration settings, or returns null if this worker must exit because + * of any of: 1. There are more than maximumPoolSize workers (due to a call + * to setMaximumPoolSize). 2. The pool is stopped. 3. The pool is shutdown + * and the queue is empty. 4. This worker timed out waiting for a task, and + * timed-out workers are subject to termination (that is, + * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) both + * before and after the timed wait. + * + * @return task, or null if the worker must exit, in which case workerCount + * is decremented + */ + private Runnable getTask() + { + boolean timedOut = false; // Did the last poll() time out? + + retry: for (;;) + { + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) + { + decrementWorkerCount(); + return null; + } + + boolean timed; // Are workers subject to culling? + + for (;;) + { + int wc = workerCountOf(c); + timed = allowCoreThreadTimeOut || wc > corePoolSize; + + if (wc <= maximumPoolSize && !(timedOut && timed)) + break; + if (compareAndDecrementWorkerCount(c)) + return null; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + + try + { + Runnable r = timed ? workQueue.poll(keepAliveTime, + TimeUnit.NANOSECONDS) : workQueue.take(); + if (r != null) + return r; + timedOut = true; + } + catch (InterruptedException retry) + { + timedOut = false; + } + } + } + + /** + * Main worker run loop. Repeatedly gets tasks from queue and executes them, + * while coping with a number of issues: + * + * 1. We may start out with an initial task, in which case we don't need to + * get the first one. Otherwise, as long as pool is running, we get tasks + * from getTask. If it returns null then the worker exits due to changed + * pool state or configuration parameters. Other exits result from exception + * throws in external code, in which case completedAbruptly holds, which + * usually leads processWorkerExit to replace this thread. + * + * 2. Before running any task, the lock is acquired to prevent other pool + * interrupts while the task is executing, and clearInterruptsForTaskRun + * called to ensure that unless pool is stopping, this thread does not have + * its interrupt set. + * + * 3. Each task run is preceded by a call to beforeExecute, which might + * throw an exception, in which case we cause thread to die (breaking loop + * with completedAbruptly true) without processing the task. + * + * 4. Assuming beforeExecute completes normally, we run the task, gathering + * any of its thrown exceptions to send to afterExecute. We separately + * handle RuntimeException, Error (both of which the specs guarantee that we + * trap) and arbitrary Throwables. Because we cannot rethrow Throwables + * within Runnable.run, we wrap them within Errors on the way out (to the + * thread's UncaughtExceptionHandler). Any thrown exception also + * conservatively causes thread to die. + * + * 5. After task.run completes, we call afterExecute, which may also throw + * an exception, which will also cause thread to die. According to JLS Sec + * 14.20, this exception is the one that will be in effect even if task.run + * throws. + * + * The net effect of the exception mechanics is that afterExecute and the + * thread's UncaughtExceptionHandler have as accurate information as we can + * provide about any problems encountered by user code. + * + * @param w + * the worker + */ + final void runWorker(Worker w) + { + Runnable task = w.firstTask; + w.firstTask = null; + boolean completedAbruptly = true; + try + { + while (task != null || (task = getTask()) != null) + { + w.lock(); + clearInterruptsForTaskRun(); + try + { + beforeExecute(w.thread, task); + Throwable thrown = null; + try + { + /* start in suspended mode to get a handle to the continuation */ + Continuation c = Continuation.startSuspendedWith(task); + /* resume the damn continuation */ + c = Continuation.continueWith(c, new ContinuationContext(c)); + /* post process the call if need be */ + ContinuationsExecutor.doPostProcessing(c); + } + catch (RuntimeException x) + { + thrown = x; + throw x; + } + catch (Error x) + { + thrown = x; + throw x; + } + catch (Throwable x) + { + thrown = x; + throw new Error(x); + } + finally + { + afterExecute(task, thrown); + } + } + finally + { + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } + finally + { + processWorkerExit(w, completedAbruptly); + } + } + + // Public constructors and methods + + /** + * Creates a new {@code ContinuationsExecutor} with the given initial + * parameters and default thread factory and rejected execution handler. It + * may be more convenient to use one of the {@link Executors} factory + * methods instead of this general purpose constructor. + * + * @param corePoolSize + * the number of threads to keep in the pool, even if they are + * idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize + * the maximum number of threads to allow in the pool + * @param keepAliveTime + * when the number of threads is greater than the core, this is + * the maximum time that excess idle threads will wait for new + * tasks before terminating. + * @param unit + * the time unit for the {@code keepAliveTime} argument + * @param workQueue + * the queue to use for holding tasks before they are executed. + * This queue will hold only the {@code Runnable} tasks submitted + * by the {@code execute} method. + * @throws IllegalArgumentException + * if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException + * if {@code workQueue} is null + */ + public ContinuationsExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) + { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); + } + + /** + * Creates a new {@code ContinuationsExecutor} with the given initial + * parameters and default rejected execution handler. + * + * @param corePoolSize + * the number of threads to keep in the pool, even if they are + * idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize + * the maximum number of threads to allow in the pool + * @param keepAliveTime + * when the number of threads is greater than the core, this is + * the maximum time that excess idle threads will wait for new + * tasks before terminating. + * @param unit + * the time unit for the {@code keepAliveTime} argument + * @param workQueue + * the queue to use for holding tasks before they are executed. + * This queue will hold only the {@code Runnable} tasks submitted + * by the {@code execute} method. + * @param threadFactory + * the factory to use when the executor creates a new thread + * @throws IllegalArgumentException + * if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException + * if {@code workQueue} or {@code threadFactory} is null + */ + public ContinuationsExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory) + { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, defaultHandler); + } + + /** + * Creates a new {@code ContinuationsExecutor} with the given initial + * parameters and default thread factory. + * + * @param corePoolSize + * the number of threads to keep in the pool, even if they are + * idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize + * the maximum number of threads to allow in the pool + * @param keepAliveTime + * when the number of threads is greater than the core, this is + * the maximum time that excess idle threads will wait for new + * tasks before terminating. + * @param unit + * the time unit for the {@code keepAliveTime} argument + * @param workQueue + * the queue to use for holding tasks before they are executed. + * This queue will hold only the {@code Runnable} tasks submitted + * by the {@code execute} method. + * @param handler + * the handler to use when execution is blocked because the + * thread bounds and queue capacities are reached + * @throws IllegalArgumentException + * if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException + * if {@code workQueue} or {@code handler} is null + */ + public ContinuationsExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, RejectedExecutionHandler handler) + { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), handler); + } + + /** + * Creates a new {@code ContinuationsExecutor} with the given initial + * parameters. + * + * @param corePoolSize + * the number of threads to keep in the pool, even if they are + * idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize + * the maximum number of threads to allow in the pool + * @param keepAliveTime + * when the number of threads is greater than the core, this is + * the maximum time that excess idle threads will wait for new + * tasks before terminating. + * @param unit + * the time unit for the {@code keepAliveTime} argument + * @param workQueue + * the queue to use for holding tasks before they are executed. + * This queue will hold only the {@code Runnable} tasks submitted + * by the {@code execute} method. + * @param threadFactory + * the factory to use when the executor creates a new thread + * @param handler + * the handler to use when execution is blocked because the + * thread bounds and queue capacities are reached + * @throws IllegalArgumentException + * if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException + * if {@code workQueue} or {@code threadFactory} or + * {@code handler} is null + */ + public ContinuationsExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory, + RejectedExecutionHandler handler) + { + if (corePoolSize < 0 || maximumPoolSize <= 0 + || maximumPoolSize < corePoolSize || keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } + + /** + * Executes the given task sometime in the future. The task may execute in a + * new thread or in an existing pooled thread. + * + * If the task cannot be submitted for execution, either because this + * executor has been shutdown or because its capacity has been reached, the + * task is handled by the current {@code RejectedExecutionHandler}. + * + * @param command + * the task to execute + * @throws RejectedExecutionException + * at discretion of {@code RejectedExecutionHandler}, if the + * task cannot be accepted for execution + * @throws NullPointerException + * if {@code command} is null + */ + public void execute(Runnable command) + { + if (command == null) + throw new NullPointerException(); + /* + * Proceed in 3 steps: + * + * 1. If fewer than corePoolSize threads are running, try to start a new + * thread with the given command as its first task. The call to + * addWorker atomically checks runState and workerCount, and so prevents + * false alarms that would add threads when it shouldn't, by returning + * false. + * + * 2. If a task can be successfully queued, then we still need to + * double-check whether we should have added a thread (because existing + * ones died since last checking) or that the pool shut down since entry + * into this method. So we recheck state and if necessary roll back the + * enqueuing if stopped, or start a new thread if there are none. + * + * 3. If we cannot queue task, then we try to add a new thread. If it + * fails, we know we are shut down or saturated and so reject the task. + */ + int c = ctl.get(); + if (workerCountOf(c) < corePoolSize) + { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + if (isRunning(c) && workQueue.offer(command)) + { + int recheck = ctl.get(); + if (!isRunning(recheck) && remove(command)) + reject(command); + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + else if (!addWorker(command, false)) + reject(command); + } + + /** + * Initiates an orderly shutdown in which previously submitted tasks are + * executed, but no new tasks will be accepted. Invocation has no additional + * effect if already shut down. + * + * @throws SecurityException + * {@inheritDoc} + */ + public void shutdown() + { + /* + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + checkShutdownAccess(); + advanceRunState(SHUTDOWN); + interruptIdleWorkers(); + onShutdown(); // hook for ScheduledThreadPoolExecutor + } + finally + { + mainLock.unlock(); + } + tryTerminate(); + */ + } + + /** + * Attempts to stop all actively executing tasks, halts the processing of + * waiting tasks, and returns a list of the tasks that were awaiting + * execution. These tasks are drained (removed) from the task queue upon + * return from this method. + * + *

+ * There are no guarantees beyond best-effort attempts to stop processing + * actively executing tasks. This implementation cancels tasks via + * {@link Thread#interrupt}, so any task that fails to respond to + * interrupts may never terminate. + * + * @throws SecurityException + * {@inheritDoc} + */ + public List shutdownNow() + { + List tasks; + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + checkShutdownAccess(); + advanceRunState(STOP); + interruptWorkers(); + tasks = drainQueue(); + } + finally + { + mainLock.unlock(); + } + tryTerminate(); + return tasks; + } + + public boolean isShutdown() + { + return !isRunning(ctl.get()); + } + + /** + * Returns true if this executor is in the process of terminating after + * {@link #shutdown} or {@link #shutdownNow} but has not completely + * terminated. This method may be useful for debugging. A return of + * {@code true} reported a sufficient period after shutdown may indicate + * that submitted tasks have ignored or suppressed interruption, causing + * this executor not to properly terminate. + * + * @return true if terminating but not yet terminated + */ + public boolean isTerminating() + { + int c = ctl.get(); + return !isRunning(c) && runStateLessThan(c, TERMINATED); + } + + public boolean isTerminated() + { + return runStateAtLeast(ctl.get(), TERMINATED); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException + { + long nanos = unit.toNanos(timeout); + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + for (;;) + { + if (runStateAtLeast(ctl.get(), TERMINATED)) + return true; + if (nanos <= 0) + return false; + nanos = termination.awaitNanos(nanos); + } + } + finally + { + mainLock.unlock(); + } + } + + /** + * Invokes {@code shutdown} when this executor is no longer referenced and + * it has no threads. + */ + protected void finalize() + { + shutdown(); + } + + /** + * Sets the thread factory used to create new threads. + * + * @param threadFactory + * the new thread factory + * @throws NullPointerException + * if threadFactory is null + * @see #getThreadFactory + */ + public void setThreadFactory(ThreadFactory threadFactory) + { + if (threadFactory == null) + throw new NullPointerException(); + this.threadFactory = threadFactory; + } + + /** + * Returns the thread factory used to create new threads. + * + * @return the current thread factory + * @see #setThreadFactory + */ + public ThreadFactory getThreadFactory() + { + return threadFactory; + } + + /** + * Sets a new handler for unexecutable tasks. + * + * @param handler + * the new handler + * @throws NullPointerException + * if handler is null + * @see #getRejectedExecutionHandler + */ + public void setRejectedExecutionHandler(RejectedExecutionHandler handler) + { + if (handler == null) + throw new NullPointerException(); + this.handler = handler; + } + + /** + * Returns the current handler for unexecutable tasks. + * + * @return the current handler + * @see #setRejectedExecutionHandler + */ + public RejectedExecutionHandler getRejectedExecutionHandler() + { + return handler; + } + + /** + * Sets the core number of threads. This overrides any value set in the + * constructor. If the new value is smaller than the current value, excess + * existing threads will be terminated when they next become idle. If + * larger, new threads will, if needed, be started to execute any queued + * tasks. + * + * @param corePoolSize + * the new core size + * @throws IllegalArgumentException + * if {@code corePoolSize < 0} + * @see #getCorePoolSize + */ + public void setCorePoolSize(int corePoolSize) + { + if (corePoolSize < 0) + throw new IllegalArgumentException(); + int delta = corePoolSize - this.corePoolSize; + this.corePoolSize = corePoolSize; + if (workerCountOf(ctl.get()) > corePoolSize) + interruptIdleWorkers(); + else if (delta > 0) + { + // We don't really know how many new threads are "needed". + // As a heuristic, prestart enough new workers (up to new + // core size) to handle the current number of tasks in + // queue, but stop if queue becomes empty while doing so. + int k = Math.min(delta, workQueue.size()); + while (k-- > 0 && addWorker(null, true)) + { + if (workQueue.isEmpty()) + break; + } + } + } + + /** + * Returns the core number of threads. + * + * @return the core number of threads + * @see #setCorePoolSize + */ + public int getCorePoolSize() + { + return corePoolSize; + } + + /** + * Starts a core thread, causing it to idly wait for work. This overrides + * the default policy of starting core threads only when new tasks are + * executed. This method will return {@code false} if all core threads have + * already been started. + * + * @return {@code true} if a thread was started + */ + public boolean prestartCoreThread() + { + return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); + } + + /** + * Starts all core threads, causing them to idly wait for work. This + * overrides the default policy of starting core threads only when new tasks + * are executed. + * + * @return the number of threads started + */ + public int prestartAllCoreThreads() + { + int n = 0; + while (addWorker(null, true)) + ++n; + return n; + } + + /** + * Returns true if this pool allows core threads to time out and terminate + * if no tasks arrive within the keepAlive time, being replaced if needed + * when new tasks arrive. When true, the same keep-alive policy applying to + * non-core threads applies also to core threads. When false (the default), + * core threads are never terminated due to lack of incoming tasks. + * + * @return {@code true} if core threads are allowed to time out, else + * {@code false} + * + * @since 1.6 + */ + public boolean allowsCoreThreadTimeOut() + { + return allowCoreThreadTimeOut; + } + + /** + * Sets the policy governing whether core threads may time out and terminate + * if no tasks arrive within the keep-alive time, being replaced if needed + * when new tasks arrive. When false, core threads are never terminated due + * to lack of incoming tasks. When true, the same keep-alive policy applying + * to non-core threads applies also to core threads. To avoid continual + * thread replacement, the keep-alive time must be greater than zero when + * setting {@code true}. This method should in general be called before the + * pool is actively used. + * + * @param value + * {@code true} if should time out, else {@code false} + * @throws IllegalArgumentException + * if value is {@code true} and the current keep-alive time is + * not greater than zero + * + * @since 1.6 + */ + public void allowCoreThreadTimeOut(boolean value) + { + if (value && keepAliveTime <= 0) + throw new IllegalArgumentException( + "Core threads must have nonzero keep alive times"); + if (value != allowCoreThreadTimeOut) + { + allowCoreThreadTimeOut = value; + if (value) + interruptIdleWorkers(); + } + } + + /** + * Sets the maximum allowed number of threads. This overrides any value set + * in the constructor. If the new value is smaller than the current value, + * excess existing threads will be terminated when they next become idle. + * + * @param maximumPoolSize + * the new maximum + * @throws IllegalArgumentException + * if the new maximum is less than or equal to zero, or less + * than the {@linkplain #getCorePoolSize core pool size} + * @see #getMaximumPoolSize + */ + public void setMaximumPoolSize(int maximumPoolSize) + { + if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) + throw new IllegalArgumentException(); + this.maximumPoolSize = maximumPoolSize; + if (workerCountOf(ctl.get()) > maximumPoolSize) + interruptIdleWorkers(); + } + + /** + * Returns the maximum allowed number of threads. + * + * @return the maximum allowed number of threads + * @see #setMaximumPoolSize + */ + public int getMaximumPoolSize() + { + return maximumPoolSize; + } + + /** + * Sets the time limit for which threads may remain idle before being + * terminated. If there are more than the core number of threads currently + * in the pool, after waiting this amount of time without processing a task, + * excess threads will be terminated. This overrides any value set in the + * constructor. + * + * @param time + * the time to wait. A time value of zero will cause excess + * threads to terminate immediately after executing tasks. + * @param unit + * the time unit of the {@code time} argument + * @throws IllegalArgumentException + * if {@code time} less than zero or if {@code time} is zero and + * {@code allowsCoreThreadTimeOut} + * @see #getKeepAliveTime + */ + public void setKeepAliveTime(long time, TimeUnit unit) + { + if (time < 0) + throw new IllegalArgumentException(); + if (time == 0 && allowsCoreThreadTimeOut()) + throw new IllegalArgumentException( + "Core threads must have nonzero keep alive times"); + long keepAliveTime = unit.toNanos(time); + long delta = keepAliveTime - this.keepAliveTime; + this.keepAliveTime = keepAliveTime; + if (delta < 0) + interruptIdleWorkers(); + } + + /** + * Returns the thread keep-alive time, which is the amount of time that + * threads in excess of the core pool size may remain idle before being + * terminated. + * + * @param unit + * the desired time unit of the result + * @return the time limit + * @see #setKeepAliveTime + */ + public long getKeepAliveTime(TimeUnit unit) + { + return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); + } + + /* User-level queue utilities */ + + /** + * Returns the task queue used by this executor. Access to the task queue is + * intended primarily for debugging and monitoring. This queue may be in + * active use. Retrieving the task queue does not prevent queued tasks from + * executing. + * + * @return the task queue + */ + public BlockingQueue getQueue() + { + return workQueue; + } + + /** + * Removes this task from the executor's internal queue if it is present, + * thus causing it not to be run if it has not already started. + * + *

+ * This method may be useful as one part of a cancellation scheme. It may + * fail to remove tasks that have been converted into other forms before + * being placed on the internal queue. For example, a task entered using + * {@code submit} might be converted into a form that maintains + * {@code Future} status. However, in such cases, method {@link #purge} may + * be used to remove those Futures that have been cancelled. + * + * @param task + * the task to remove + * @return true if the task was removed + */ + public boolean remove(Runnable task) + { + boolean removed = workQueue.remove(task); + tryTerminate(); // In case SHUTDOWN and now empty + return removed; + } + + /** + * Tries to remove from the work queue all {@link Future} tasks that have + * been cancelled. This method can be useful as a storage reclamation + * operation, that has no other impact on functionality. Cancelled tasks are + * never executed, but may accumulate in work queues until worker threads + * can actively remove them. Invoking this method instead tries to remove + * them now. However, this method may fail to remove tasks in the presence + * of interference by other threads. + */ + public void purge() + { + final BlockingQueue q = workQueue; + try + { + Iterator it = q.iterator(); + while (it.hasNext()) + { + Runnable r = it.next(); + if (r instanceof Future && ((Future) r).isCancelled()) + it.remove(); + } + } + catch (ConcurrentModificationException fallThrough) + { + // Take slow path if we encounter interference during traversal. + // Make copy for traversal and call remove for cancelled entries. + // The slow path is more likely to be O(N*N). + for (Object r : q.toArray()) + if (r instanceof Future && ((Future) r).isCancelled()) + q.remove(r); + } + + tryTerminate(); // In case SHUTDOWN and now empty + } + + /* Statistics */ + + /** + * Returns the current number of threads in the pool. + * + * @return the number of threads + */ + public int getPoolSize() + { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + // Remove rare and surprising possibility of + // isTerminated() && getPoolSize() > 0 + return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); + } + finally + { + mainLock.unlock(); + } + } + + /** + * Returns the approximate number of threads that are actively executing + * tasks. + * + * @return the number of threads + */ + public int getActiveCount() + { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + int n = 0; + for (Worker w : workers) + if (w.isLocked()) + ++n; + return n; + } + finally + { + mainLock.unlock(); + } + } + + /** + * Returns the largest number of threads that have ever simultaneously been + * in the pool. + * + * @return the number of threads + */ + public int getLargestPoolSize() + { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + return largestPoolSize; + } + finally + { + mainLock.unlock(); + } + } + + /** + * Returns the approximate total number of tasks that have ever been + * scheduled for execution. Because the states of tasks and threads may + * change dynamically during computation, the returned value is only an + * approximation. + * + * @return the number of tasks + */ + public long getTaskCount() + { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + long n = completedTaskCount; + for (Worker w : workers) + { + n += w.completedTasks; + if (w.isLocked()) + ++n; + } + return n + workQueue.size(); + } + finally + { + mainLock.unlock(); + } + } + + /** + * Returns the approximate total number of tasks that have completed + * execution. Because the states of tasks and threads may change dynamically + * during computation, the returned value is only an approximation, but one + * that does not ever decrease across successive calls. + * + * @return the number of tasks + */ + public long getCompletedTaskCount() + { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try + { + long n = completedTaskCount; + for (Worker w : workers) + n += w.completedTasks; + return n; + } + finally + { + mainLock.unlock(); + } + } + + /* Extension hooks */ + + /** + * Method invoked prior to executing the given Runnable in the given thread. + * This method is invoked by thread {@code t} that will execute task + * {@code r}, and may be used to re-initialize ThreadLocals, or to perform + * logging. + * + *

+ * This implementation does nothing, but may be customized in subclasses. + * Note: To properly nest multiple overridings, subclasses should generally + * invoke {@code super.beforeExecute} at the end of this method. + * + * @param t + * the thread that will run task {@code r} + * @param r + * the task that will be executed + */ + protected void beforeExecute(Thread t, Runnable r) + { + } + + /** + * Method invoked upon completion of execution of the given Runnable. This + * method is invoked by the thread that executed the task. If non-null, the + * Throwable is the uncaught {@code RuntimeException} or {@code Error} that + * caused execution to terminate abruptly. + * + *

+ * This implementation does nothing, but may be customized in subclasses. + * Note: To properly nest multiple overridings, subclasses should generally + * invoke {@code super.afterExecute} at the beginning of this method. + * + *

+ * Note: When actions are enclosed in tasks (such as + * {@link FutureTask}) either explicitly or via methods such as + * {@code submit}, these task objects catch and maintain computational + * exceptions, and so they do not cause abrupt termination, and the internal + * exceptions are not passed to this method. If you would like to + * trap both kinds of failures in this method, you can further probe for + * such cases, as in this sample subclass that prints either the direct + * cause or the underlying exception if a task has been aborted: + * + *

+     * {
+     *     @code
+     *     class ExtendedExecutor extends ContinuationsExecutor
+     *     {
+     *         // ...
+     *         protected void afterExecute(Runnable r, Throwable t)
+     *         {
+     *             super.afterExecute(r, t);
+     *             if (t == null && r instanceof Future<?>)
+     *             {
+     *                 try
+     *                 {
+     *                     Object result = ((Future<?>) r).get();
+     *                 }
+     *                 catch (CancellationException ce)
+     *                 {
+     *                     t = ce;
+     *                 }
+     *                 catch (ExecutionException ee)
+     *                 {
+     *                     t = ee.getCause();
+     *                 }
+     *                 catch (InterruptedException ie)
+     *                 {
+     *                     Thread.currentThread().interrupt(); // ignore/reset
+     *                 }
+     *             }
+     *             if (t != null)
+     *                 System.out.println(t);
+     *         }
+     *     }
+     * }
+     * 
+ * + * @param r + * the runnable that has completed + * @param t + * the exception that caused termination, or null if execution + * completed normally + */ + protected void afterExecute(Runnable r, Throwable t) + { + if ( t != null ) + logger_.info( LogUtil.throwableToString(t) ); + } + + /** + * Method invoked when the Executor has terminated. Default implementation + * does nothing. Note: To properly nest multiple overridings, subclasses + * should generally invoke {@code super.terminated} within this method. + */ + protected void terminated() + { + } + + /* Predefined RejectedExecutionHandlers */ + + /** + * A handler for rejected tasks that runs the rejected task directly in the + * calling thread of the {@code execute} method, unless the executor has + * been shut down, in which case the task is discarded. + */ + public static class CallerRunsPolicy implements RejectedExecutionHandler + { + /** + * Creates a {@code CallerRunsPolicy}. + */ + public CallerRunsPolicy() + { + } + + /** + * Executes task r in the caller's thread, unless the executor has been + * shut down, in which case the task is discarded. + * + * @param r + * the runnable task requested to be executed + * @param e + * the executor attempting to execute this task + */ + public void rejectedExecution(Runnable r, ContinuationsExecutor e) + { + if (!e.isShutdown()) + { + r.run(); + } + } + } + + /** + * A handler for rejected tasks that throws a + * {@code RejectedExecutionException}. + */ + public static class AbortPolicy implements RejectedExecutionHandler + { + /** + * Creates an {@code AbortPolicy}. + */ + public AbortPolicy() + { + } + + /** + * Always throws RejectedExecutionException. + * + * @param r + * the runnable task requested to be executed + * @param e + * the executor attempting to execute this task + * @throws RejectedExecutionException + * always. + */ + public void rejectedExecution(Runnable r, ContinuationsExecutor e) + { + throw new RejectedExecutionException(); + } + } + + /** + * A handler for rejected tasks that silently discards the rejected task. + */ + public static class DiscardPolicy implements RejectedExecutionHandler + { + /** + * Creates a {@code DiscardPolicy}. + */ + public DiscardPolicy() + { + } + + /** + * Does nothing, which has the effect of discarding task r. + * + * @param r + * the runnable task requested to be executed + * @param e + * the executor attempting to execute this task + */ + public void rejectedExecution(Runnable r, ContinuationsExecutor e) + { + } + } + + /** + * A handler for rejected tasks that discards the oldest unhandled request + * and then retries {@code execute}, unless the executor is shut down, in + * which case the task is discarded. + */ + public static class DiscardOldestPolicy implements RejectedExecutionHandler + { + /** + * Creates a {@code DiscardOldestPolicy} for the given executor. + */ + public DiscardOldestPolicy() + { + } + + /** + * Obtains and ignores the next task that the executor + * would otherwise execute, if one is immediately available, + * and then retries execution of task r, unless the executor + * is shut down, in which case task r is instead discarded. + * + * @param r the runnable task requested to be executed + * @param e the executor attempting to execute this task + */ + public void rejectedExecution(Runnable r, ContinuationsExecutor e) + { + if (!e.isShutdown()) + { + e.getQueue().poll(); + e.execute(r); + } + } + } +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import java.util.concurrent.*; + +import org.apache.cassandra.utils.LogUtil; +import org.apache.log4j.Logger; +import org.apache.cassandra.utils.*; + +/** + * This is a wrapper class for the ScheduledThreadPoolExecutor. It provides an implementation + * for the afterExecute() found in the ThreadPoolExecutor class to log any unexpected + * Runtime Exceptions. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ +public final class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor +{ + private static Logger logger_ = Logger.getLogger(DebuggableScheduledThreadPoolExecutor.class); + + public DebuggableScheduledThreadPoolExecutor(int threads, + ThreadFactory threadFactory) + { + super(threads, threadFactory); + } + + /** + * (non-Javadoc) + * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable) + */ + public void afterExecute(Runnable r, Throwable t) + { + super.afterExecute(r,t); + if ( t != null ) + { + Context ctx = ThreadLocalContext.get(); + if ( ctx != null ) + { + Object object = ctx.get(r.getClass().getName()); + + if ( object != null ) + { + logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured while working with " + object + " ****"); + } + else + { + logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured ****"); + } + } + + Throwable cause = t.getCause(); + if ( cause != null ) + { + logger_.info( LogUtil.throwableToString(cause) ); + } + logger_.info( LogUtil.throwableToString(t) ); + } + } +}