Return-Path: Delivered-To: apmail-incubator-harmony-commits-archive@www.apache.org Received: (qmail 99859 invoked from network); 11 Sep 2006 00:00:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 11 Sep 2006 00:00:15 -0000 Received: (qmail 75502 invoked by uid 500); 11 Sep 2006 00:00:12 -0000 Delivered-To: apmail-incubator-harmony-commits-archive@incubator.apache.org Received: (qmail 75314 invoked by uid 500); 11 Sep 2006 00:00:11 -0000 Mailing-List: contact harmony-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: harmony-dev@incubator.apache.org Delivered-To: mailing list harmony-commits@incubator.apache.org Received: (qmail 75045 invoked by uid 99); 11 Sep 2006 00:00:10 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Sep 2006 17:00:10 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Sep 2006 17:00:03 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id A9E111A9825; Sun, 10 Sep 2006 16:59:43 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r442038 [5/19] - in /incubator/harmony/standard/classlib/trunk/modules/concurrent: ./ src/ src/main/ src/main/java/ src/main/java/java/ src/main/java/java/util/ src/main/java/java/util/concurrent/ src/main/java/java/util/concurrent/atomic/ ... Date: Sun, 10 Sep 2006 23:59:35 -0000 To: harmony-commits@incubator.apache.org From: ndbeyer@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060910235943.A9E111A9825@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,33 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * A handler for tasks that cannot be executed by a {@link + * ThreadPoolExecutor}. + * + * @since 1.5 + * @author Doug Lea + */ +public interface RejectedExecutionHandler { + + /** + * Method that may be invoked by a {@link ThreadPoolExecutor} when + * execute cannot accept a task. This may occur when no + * more threads or queue slots are available because their bounds + * would be exceeded, or upon shutdown of the Executor. + * + * In the absence other alternatives, the method may throw an + * unchecked {@link RejectedExecutionException}, which will be + * propagated to the caller of execute. + * + * @param r the runnable task requested to be executed + * @param executor the executor attempting to execute this task + * @throws RejectedExecutionException if there is no remedy + */ + void rejectedExecution(Runnable r, ThreadPoolExecutor executor); +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,145 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.atomic.*; +import java.util.*; + +/** + * An {@link ExecutorService} that can schedule commands to run after a given + * delay, or to execute periodically. + * + *

The schedule methods create tasks with various delays + * and return a task object that can be used to cancel or check + * execution. The scheduleAtFixedRate and + * scheduleWithFixedDelay methods create and execute tasks + * that run periodically until cancelled. + * + *

Commands submitted using the {@link Executor#execute} and + * {@link ExecutorService} submit methods are scheduled with + * a requested delay of zero. Zero and negative delays (but not + * periods) are also allowed in schedule methods, and are + * treated as requests for immediate execution. + * + *

All schedule methods accept relative delays and + * periods as arguments, not absolute times or dates. It is a simple + * matter to transform an absolute time represented as a {@link + * java.util.Date} to the required form. For example, to schedule at + * a certain future date, you can use: schedule(task, + * date.getTime() - System.currentTimeMillis(), + * TimeUnit.MILLISECONDS). Beware however that expiration of a + * relative delay need not coincide with the current Date at + * which the task is enabled due to network time synchronization + * protocols, clock drift, or other factors. + * + * The {@link Executors} class provides convenient factory methods for + * the ScheduledExecutorService implementations provided in this package. + * + *

Usage Example

+ * + * Here is a class with a method that sets up a ScheduledExecutorService + * to beep every ten seconds for an hour: + * + *
+ * import static java.util.concurrent.TimeUnit;
+ * class BeeperControl {
+ *    private final ScheduledExecutorService scheduler = 
+ *       Executors.newScheduledThreadPool(1);
+ *
+ *    public void beepForAnHour() {
+ *        final Runnable beeper = new Runnable() {
+ *                public void run() { System.out.println("beep"); }
+ *            };
+ *        final ScheduledFuture<?> beeperHandle = 
+ *            scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
+ *        scheduler.schedule(new Runnable() {
+ *                public void run() { beeperHandle.cancel(true); }
+ *            }, 60 * 60, SECONDS);
+ *    }
+ * }
+ * 
+ * + * @since 1.5 + * @author Doug Lea + */ +public interface ScheduledExecutorService extends ExecutorService { + + /** + * Creates and executes a one-shot action that becomes enabled + * after the given delay. + * @param command the task to execute. + * @param delay the time from now to delay execution. + * @param unit the time unit of the delay parameter. + * @return a Future representing pending completion of the task, + * and whose get() method will return null + * upon completion. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if command is null + */ + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + + /** + * Creates and executes a ScheduledFuture that becomes enabled after the + * given delay. + * @param callable the function to execute. + * @param delay the time from now to delay execution. + * @param unit the time unit of the delay parameter. + * @return a ScheduledFuture that can be used to extract result or cancel. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if callable is null + */ + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the given + * period; that is executions will commence after + * initialDelay then initialDelay+period, then + * initialDelay + 2 * period, and so on. + * If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. + * @param command the task to execute. + * @param initialDelay the time to delay first execution. + * @param period the period between successive executions. + * @param unit the time unit of the initialDelay and period parameters + * @return a Future representing pending completion of the task, + * and whose get() method will throw an exception upon + * cancellation. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if command is null + * @throws IllegalArgumentException if period less than or equal to zero. + */ + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the + * given delay between the termination of one execution and the + * commencement of the next. If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. + * @param command the task to execute. + * @param initialDelay the time to delay first execution. + * @param delay the delay between the termination of one + * execution and the commencement of the next. + * @param unit the time unit of the initialDelay and delay parameters + * @return a Future representing pending completion of the task, + * and whose get() method will throw an exception upon + * cancellation. + * @throws RejectedExecutionException if task cannot be scheduled + * for execution. + * @throws NullPointerException if command is null + * @throws IllegalArgumentException if delay less than or equal to zero. + */ + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); + +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledExecutorService.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,19 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * A delayed result-bearing action that can be cancelled. + * Usually a scheduled future is the result of scheduling + * a task with a {@link ScheduledExecutorService}. + * + * @since 1.5 + * @author Doug Lea + * @param The result type returned by this Future + */ +public interface ScheduledFuture extends Delayed, Future { +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledFuture.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,541 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.atomic.*; +import java.util.*; + +/** + * A {@link ThreadPoolExecutor} that can additionally schedule + * commands to run after a given delay, or to execute + * periodically. This class is preferable to {@link java.util.Timer} + * when multiple worker threads are needed, or when the additional + * flexibility or capabilities of {@link ThreadPoolExecutor} (which + * this class extends) are required. + * + *

Delayed tasks execute no sooner than they are enabled, but + * without any real-time guarantees about when, after they are + * enabled, they will commence. Tasks scheduled for exactly the same + * execution time are enabled in first-in-first-out (FIFO) order of + * submission. + * + *

While this class inherits from {@link ThreadPoolExecutor}, a few + * of the inherited tuning methods are not useful for it. In + * particular, because it acts as a fixed-sized pool using + * corePoolSize threads and an unbounded queue, adjustments + * to maximumPoolSize have no useful effect. + * + * @since 1.5 + * @author Doug Lea + */ +public class ScheduledThreadPoolExecutor + extends ThreadPoolExecutor + implements ScheduledExecutorService { + + /** + * False if should cancel/suppress periodic tasks on shutdown. + */ + private volatile boolean continueExistingPeriodicTasksAfterShutdown; + + /** + * False if should cancel non-periodic tasks on shutdown. + */ + private volatile boolean executeExistingDelayedTasksAfterShutdown = true; + + /** + * Sequence number to break scheduling ties, and in turn to + * guarantee FIFO order among tied entries. + */ + private static final AtomicLong sequencer = new AtomicLong(0); + + /** Base of nanosecond timings, to avoid wrapping */ + private static final long NANO_ORIGIN = System.nanoTime(); + + /** + * Returns nanosecond time offset by origin + */ + final long now() { + return System.nanoTime() - NANO_ORIGIN; + } + + private class ScheduledFutureTask + extends FutureTask implements ScheduledFuture { + + /** Sequence number to break ties FIFO */ + private final long sequenceNumber; + /** The time the task is enabled to execute in nanoTime units */ + private long time; + /** + * Period in nanoseconds for repeating tasks. A positive + * value indicates fixed-rate execution. A negative value + * indicates fixed-delay execution. A value of 0 indicates a + * non-repeating task. + */ + private final long period; + + /** + * Creates a one-shot action with given nanoTime-based trigger time + */ + ScheduledFutureTask(Runnable r, V result, long ns) { + super(r, result); + this.time = ns; + this.period = 0; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + /** + * Creates a periodic action with given nano time and period + */ + ScheduledFutureTask(Runnable r, V result, long ns, long period) { + super(r, result); + this.time = ns; + this.period = period; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + /** + * Creates a one-shot action with given nanoTime-based trigger + */ + ScheduledFutureTask(Callable callable, long ns) { + super(callable); + this.time = ns; + this.period = 0; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + public long getDelay(TimeUnit unit) { + long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); + return d; + } + + public int compareTo(Object other) { + if (other == this) // compare zero ONLY if same object + return 0; + ScheduledFutureTask x = (ScheduledFutureTask)other; + long diff = time - x.time; + if (diff < 0) + return -1; + else if (diff > 0) + return 1; + else if (sequenceNumber < x.sequenceNumber) + return -1; + else + return 1; + } + + /** + * Returns true if this is a periodic (not a one-shot) action. + * @return true if periodic + */ + boolean isPeriodic() { + return period != 0; + } + + /** + * Run a periodic task + */ + private void runPeriodic() { + boolean ok = ScheduledFutureTask.super.runAndReset(); + boolean down = isShutdown(); + // Reschedule if not cancelled and not shutdown or policy allows + if (ok && (!down || + (getContinueExistingPeriodicTasksAfterShutdownPolicy() && + !isTerminating()))) { + long p = period; + if (p > 0) + time += p; + else + time = now() - p; + ScheduledThreadPoolExecutor.super.getQueue().add(this); + } + // This might have been the final executed delayed + // task. Wake up threads to check. + else if (down) + interruptIdleWorkers(); + } + + /** + * Overrides FutureTask version so as to reset/requeue if periodic. + */ + public void run() { + if (isPeriodic()) + runPeriodic(); + else + ScheduledFutureTask.super.run(); + } + } + + /** + * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. + */ + private void delayedExecute(Runnable command) { + if (isShutdown()) { + reject(command); + return; + } + // Prestart a thread if necessary. We cannot prestart it + // running the task because the task (probably) shouldn't be + // run yet, so thread will just idle until delay elapses. + if (getPoolSize() < getCorePoolSize()) + prestartCoreThread(); + + super.getQueue().add(command); + } + + /** + * Cancel and clear the queue of all tasks that should not be run + * due to shutdown policy. + */ + private void cancelUnwantedTasks() { + boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); + boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); + if (!keepDelayed && !keepPeriodic) + super.getQueue().clear(); + else if (keepDelayed || keepPeriodic) { + Object[] entries = super.getQueue().toArray(); + for (int i = 0; i < entries.length; ++i) { + Object e = entries[i]; + if (e instanceof ScheduledFutureTask) { + ScheduledFutureTask t = (ScheduledFutureTask)e; + if (t.isPeriodic()? !keepPeriodic : !keepDelayed) + t.cancel(false); + } + } + entries = null; + purge(); + } + } + + public boolean remove(Runnable task) { + if (!(task instanceof ScheduledFutureTask)) + return false; + return getQueue().remove(task); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given core + * pool size. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @throws IllegalArgumentException if corePoolSize less than or + * equal to zero + */ + public ScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue()); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @param threadFactory the factory to use when the executor + * creates a new thread. + * @throws NullPointerException if threadFactory is null + */ + public ScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue(), threadFactory); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached. + * @throws NullPointerException if handler is null + */ + public ScheduledThreadPoolExecutor(int corePoolSize, + RejectedExecutionHandler handler) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue(), handler); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @param threadFactory the factory to use when the executor + * creates a new thread. + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached. + * @throws NullPointerException if threadFactory or handler is null + */ + public ScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, + new DelayedWorkQueue(), threadFactory, handler); + } + + public ScheduledFuture schedule(Runnable command, + long delay, + TimeUnit unit) { + if (command == null || unit == null) + throw new NullPointerException(); + long triggerTime = now() + unit.toNanos(delay); + ScheduledFutureTask t = + new ScheduledFutureTask(command, null, triggerTime); + delayedExecute(t); + return t; + } + + public ScheduledFuture schedule(Callable callable, + long delay, + TimeUnit unit) { + if (callable == null || unit == null) + throw new NullPointerException(); + if (delay < 0) delay = 0; + long triggerTime = now() + unit.toNanos(delay); + ScheduledFutureTask t = + new ScheduledFutureTask(callable, triggerTime); + delayedExecute(t); + return t; + } + + public ScheduledFuture scheduleAtFixedRate(Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + if (command == null || unit == null) + throw new NullPointerException(); + if (period <= 0) + throw new IllegalArgumentException(); + if (initialDelay < 0) initialDelay = 0; + long triggerTime = now() + unit.toNanos(initialDelay); + ScheduledFutureTask t = + new ScheduledFutureTask(command, + null, + triggerTime, + unit.toNanos(period)); + delayedExecute(t); + return t; + } + + public ScheduledFuture scheduleWithFixedDelay(Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + if (command == null || unit == null) + throw new NullPointerException(); + if (delay <= 0) + throw new IllegalArgumentException(); + if (initialDelay < 0) initialDelay = 0; + long triggerTime = now() + unit.toNanos(initialDelay); + ScheduledFutureTask t = + new ScheduledFutureTask(command, + null, + triggerTime, + unit.toNanos(-delay)); + delayedExecute(t); + return t; + } + + + /** + * Execute command with zero required delay. This has effect + * equivalent to schedule(command, 0, anyUnit). Note + * that inspections of the queue and of the list returned by + * shutdownNow will access the zero-delayed + * {@link ScheduledFuture}, not the command itself. + * + * @param command the task to execute + * @throws RejectedExecutionException at discretion of + * RejectedExecutionHandler, if task cannot be accepted + * for execution because the executor has been shut down. + * @throws NullPointerException if command is null + */ + public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + schedule(command, 0, TimeUnit.NANOSECONDS); + } + + // Override AbstractExecutorService methods + + public Future submit(Runnable task) { + return schedule(task, 0, TimeUnit.NANOSECONDS); + } + + public Future submit(Runnable task, T result) { + return schedule(Executors.callable(task, result), + 0, TimeUnit.NANOSECONDS); + } + + public Future submit(Callable task) { + return schedule(task, 0, TimeUnit.NANOSECONDS); + } + + /** + * Set policy on whether to continue executing existing periodic + * tasks even when this executor has been shutdown. In + * this case, these tasks will only terminate upon + * shutdownNow, or after setting the policy to + * false when already shutdown. This value is by default + * false. + * @param value if true, continue after shutdown, else don't. + * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy + */ + public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { + continueExistingPeriodicTasksAfterShutdown = value; + if (!value && isShutdown()) + cancelUnwantedTasks(); + } + + /** + * Get the policy on whether to continue executing existing + * periodic tasks even when this executor has been + * shutdown. In this case, these tasks will only + * terminate upon shutdownNow or after setting the policy + * to false when already shutdown. This value is by + * default false. + * @return true if will continue after shutdown. + * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy + */ + public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { + return continueExistingPeriodicTasksAfterShutdown; + } + + /** + * Set policy on whether to execute existing delayed + * tasks even when this executor has been shutdown. In + * this case, these tasks will only terminate upon + * shutdownNow, or after setting the policy to + * false when already shutdown. This value is by default + * true. + * @param value if true, execute after shutdown, else don't. + * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy + */ + public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { + executeExistingDelayedTasksAfterShutdown = value; + if (!value && isShutdown()) + cancelUnwantedTasks(); + } + + /** + * Get policy on whether to execute existing delayed + * tasks even when this executor has been shutdown. In + * this case, these tasks will only terminate upon + * shutdownNow, or after setting the policy to + * false when already shutdown. This value is by default + * true. + * @return true if will execute after shutdown. + * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy + */ + public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { + return executeExistingDelayedTasksAfterShutdown; + } + + + /** + * Initiates an orderly shutdown in which previously submitted + * tasks are executed, but no new tasks will be accepted. If the + * ExecuteExistingDelayedTasksAfterShutdownPolicy has + * been set false, existing delayed tasks whose delays + * have not yet elapsed are cancelled. And unless the + * ContinueExistingPeriodicTasksAfterShutdownPolicy has + * been set true, future executions of existing periodic + * tasks will be cancelled. + */ + public void shutdown() { + cancelUnwantedTasks(); + super.shutdown(); + } + + /** + * Attempts to stop all actively executing tasks, halts the + * processing of waiting tasks, and returns a list of the tasks that were + * awaiting execution. + * + *

There are no guarantees beyond best-effort attempts to stop + * processing actively executing tasks. This implementation + * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or + * fail to respond to interrupts, they may never terminate. + * + * @return list of tasks that never commenced execution. Each + * element of this list is a {@link ScheduledFuture}, + * including those tasks submitted using execute, which + * are for scheduling purposes used as the basis of a zero-delay + * ScheduledFuture. + */ + public List shutdownNow() { + return super.shutdownNow(); + } + + /** + * Returns the task queue used by this executor. Each element of + * this queue is a {@link ScheduledFuture}, including those + * tasks submitted using execute which are for scheduling + * purposes used as the basis of a zero-delay + * ScheduledFuture. Iteration over this queue is + * not guaranteed to traverse tasks in the order in + * which they will execute. + * + * @return the task queue + */ + public BlockingQueue getQueue() { + return super.getQueue(); + } + + /** + * An annoying wrapper class to convince generics compiler to + * use a DelayQueue as a BlockingQueue + */ + private static class DelayedWorkQueue + extends AbstractCollection + implements BlockingQueue { + + private final DelayQueue dq = new DelayQueue(); + public Runnable poll() { return dq.poll(); } + public Runnable peek() { return dq.peek(); } + public Runnable take() throws InterruptedException { return dq.take(); } + public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { + return dq.poll(timeout, unit); + } + + public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); } + public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); } + public void put(Runnable x) { + dq.put((ScheduledFutureTask)x); + } + public boolean offer(Runnable x, long timeout, TimeUnit unit) { + return dq.offer((ScheduledFutureTask)x, timeout, unit); + } + + public Runnable remove() { return dq.remove(); } + public Runnable element() { return dq.element(); } + public void clear() { dq.clear(); } + public int drainTo(Collection c) { return dq.drainTo(c); } + public int drainTo(Collection c, int maxElements) { + return dq.drainTo(c, maxElements); + } + + public int remainingCapacity() { return dq.remainingCapacity(); } + public boolean remove(Object x) { return dq.remove(x); } + public boolean contains(Object x) { return dq.contains(x); } + public int size() { return dq.size(); } + public boolean isEmpty() { return dq.isEmpty(); } + public Object[] toArray() { return dq.toArray(); } + public T[] toArray(T[] array) { return dq.toArray(array); } + public Iterator iterator() { + return new Iterator() { + private Iterator it = dq.iterator(); + public boolean hasNext() { return it.hasNext(); } + public Runnable next() { return it.next(); } + public void remove() { it.remove(); } + }; + } + } +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Semaphore.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Semaphore.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Semaphore.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Semaphore.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,665 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.*; +import java.util.concurrent.locks.*; +import java.util.concurrent.atomic.*; + +/** + * A counting semaphore. Conceptually, a semaphore maintains a set of + * permits. Each {@link #acquire} blocks if necessary until a permit is + * available, and then takes it. Each {@link #release} adds a permit, + * potentially releasing a blocking acquirer. + * However, no actual permit objects are used; the Semaphore just + * keeps a count of the number available and acts accordingly. + * + *

Semaphores are often used to restrict the number of threads than can + * access some (physical or logical) resource. For example, here is + * a class that uses a semaphore to control access to a pool of items: + *

+ * class Pool {
+ *   private static final MAX_AVAILABLE = 100;
+ *   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
+ *
+ *   public Object getItem() throws InterruptedException {
+ *     available.acquire();
+ *     return getNextAvailableItem();
+ *   }
+ *
+ *   public void putItem(Object x) {
+ *     if (markAsUnused(x))
+ *       available.release();
+ *   }
+ *
+ *   // Not a particularly efficient data structure; just for demo
+ *
+ *   protected Object[] items = ... whatever kinds of items being managed
+ *   protected boolean[] used = new boolean[MAX_AVAILABLE];
+ *
+ *   protected synchronized Object getNextAvailableItem() {
+ *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
+ *       if (!used[i]) {
+ *          used[i] = true;
+ *          return items[i];
+ *       }
+ *     }
+ *     return null; // not reached
+ *   }
+ *
+ *   protected synchronized boolean markAsUnused(Object item) {
+ *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
+ *       if (item == items[i]) {
+ *          if (used[i]) {
+ *            used[i] = false;
+ *            return true;
+ *          } else
+ *            return false;
+ *       }
+ *     }
+ *     return false;
+ *   }
+ *
+ * }
+ * 
+ * + *

Before obtaining an item each thread must acquire a permit from + * the semaphore, guaranteeing that an item is available for use. When + * the thread has finished with the item it is returned back to the + * pool and a permit is returned to the semaphore, allowing another + * thread to acquire that item. Note that no synchronization lock is + * held when {@link #acquire} is called as that would prevent an item + * from being returned to the pool. The semaphore encapsulates the + * synchronization needed to restrict access to the pool, separately + * from any synchronization needed to maintain the consistency of the + * pool itself. + * + *

A semaphore initialized to one, and which is used such that it + * only has at most one permit available, can serve as a mutual + * exclusion lock. This is more commonly known as a binary + * semaphore, because it only has two states: one permit + * available, or zero permits available. When used in this way, the + * binary semaphore has the property (unlike many {@link Lock} + * implementations), that the "lock" can be released by a + * thread other than the owner (as semaphores have no notion of + * ownership). This can be useful in some specialized contexts, such + * as deadlock recovery. + * + *

The constructor for this class optionally accepts a + * fairness parameter. When set false, this class makes no + * guarantees about the order in which threads acquire permits. In + * particular, barging is permitted, that is, a thread + * invoking {@link #acquire} can be allocated a permit ahead of a + * thread that has been waiting. When fairness is set true, the + * semaphore guarantees that threads invoking any of the {@link + * #acquire() acquire} methods are allocated permits in the order in + * which their invocation of those methods was processed + * (first-in-first-out; FIFO). Note that FIFO ordering necessarily + * applies to specific internal points of execution within these + * methods. So, it is possible for one thread to invoke + * acquire before another, but reach the ordering point after + * the other, and similarly upon return from the method. + * + *

Generally, semaphores used to control resource access should be + * initialized as fair, to ensure that no thread is starved out from + * accessing a resource. When using semaphores for other kinds of + * synchronization control, the throughput advantages of non-fair + * ordering often outweigh fairness considerations. + * + *

This class also provides convenience methods to {@link + * #acquire(int) acquire} and {@link #release(int) release} multiple + * permits at a time. Beware of the increased risk of indefinite + * postponement when these methods are used without fairness set true. + * + * @since 1.5 + * @author Doug Lea + * + */ + +public class Semaphore implements java.io.Serializable { + private static final long serialVersionUID = -3222578661600680210L; + /** All mechanics via AbstractQueuedSynchronizer subclass */ + private final Sync sync; + + /** + * Synchronization implementation for semaphore. Uses AQS state + * to represent permits. Subclassed into fair and nonfair + * versions. + */ + abstract static class Sync extends AbstractQueuedSynchronizer { + Sync(int permits) { + setState(permits); + } + + final int getPermits() { + return getState(); + } + + final int nonfairTryAcquireShared(int acquires) { + for (;;) { + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } + } + + protected final boolean tryReleaseShared(int releases) { + for (;;) { + int p = getState(); + if (compareAndSetState(p, p + releases)) + return true; + } + } + + final void reducePermits(int reductions) { + for (;;) { + int current = getState(); + int next = current - reductions; + if (compareAndSetState(current, next)) + return; + } + } + + final int drainPermits() { + for (;;) { + int current = getState(); + if (current == 0 || compareAndSetState(current, 0)) + return current; + } + } + } + + /** + * NonFair version + */ + final static class NonfairSync extends Sync { + NonfairSync(int permits) { + super(permits); + } + + protected int tryAcquireShared(int acquires) { + return nonfairTryAcquireShared(acquires); + } + } + + /** + * Fair version + */ + final static class FairSync extends Sync { + FairSync(int permits) { + super(permits); + } + + protected int tryAcquireShared(int acquires) { + Thread current = Thread.currentThread(); + for (;;) { + Thread first = getFirstQueuedThread(); + if (first != null && first != current) + return -1; + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } + } + } + + /** + * Creates a Semaphore with the given number of + * permits and nonfair fairness setting. + * @param permits the initial number of permits available. This + * value may be negative, in which case releases must + * occur before any acquires will be granted. + */ + public Semaphore(int permits) { + sync = new NonfairSync(permits); + } + + /** + * Creates a Semaphore with the given number of + * permits and the given fairness setting. + * @param permits the initial number of permits available. This + * value may be negative, in which case releases must + * occur before any acquires will be granted. + * @param fair true if this semaphore will guarantee first-in + * first-out granting of permits under contention, else false. + */ + public Semaphore(int permits, boolean fair) { + sync = (fair)? new FairSync(permits) : new NonfairSync(permits); + } + + /** + * Acquires a permit from this semaphore, blocking until one is + * available, or the thread is {@link Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread. + *
+ * + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting + * for a permit, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * @throws InterruptedException if the current thread is interrupted + * + * @see Thread#interrupt + */ + public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Acquires a permit from this semaphore, blocking until one is + * available. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit. + * + *

If the current thread + * is {@link Thread#interrupt interrupted} while waiting + * for a permit then it will continue to wait, but the time at which + * the thread is assigned a permit may change compared to the time it + * would have received the permit had no interruption occurred. When the + * thread does return from this method its interrupt status will be set. + * + */ + public void acquireUninterruptibly() { + sync.acquireShared(1); + } + + /** + * Acquires a permit from this semaphore, only if one is available at the + * time of invocation. + *

Acquires a permit, if one is available and returns immediately, + * with the value true, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value false. + * + *

Even when this semaphore has been set to use a + * fair ordering policy, a call to tryAcquire() will + * immediately acquire a permit if one is available, whether or not + * other threads are currently waiting. + * This "barging" behavior can be useful in certain + * circumstances, even though it breaks fairness. If you want to honor + * the fairness setting, then use + * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) } + * which is almost equivalent (it also detects interruption). + * + * @return true if a permit was acquired and false + * otherwise. + */ + public boolean tryAcquire() { + return sync.nonfairTryAcquireShared(1) >= 0; + } + + /** + * Acquires a permit from this semaphore, if one becomes available + * within the given waiting time and the + * current thread has not been {@link Thread#interrupt interrupted}. + *

Acquires a permit, if one is available and returns immediately, + * with the value true, + * reducing the number of available permits by one. + *

If no permit is available then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of three things happens: + *

    + *
  • Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread; or + *
  • The specified waiting time elapses. + *
+ *

If a permit is acquired then the value true is returned. + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting to acquire + * a permit, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + *

If the specified waiting time elapses then the value false + * is returned. + * If the time is less than or equal to zero, the method will not wait + * at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the timeout argument. + * @return true if a permit was acquired and false + * if the waiting time elapsed before a permit was acquired. + * + * @throws InterruptedException if the current thread is interrupted + * + * @see Thread#interrupt + * + */ + public boolean tryAcquire(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + /** + * Releases a permit, returning it to the semaphore. + *

Releases a permit, increasing the number of available permits + * by one. + * If any threads are blocking trying to acquire a permit, then one + * is selected and given the permit that was just released. + * That thread is re-enabled for thread scheduling purposes. + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link #acquire}. + * Correct usage of a semaphore is established by programming convention + * in the application. + */ + public void release() { + sync.releaseShared(1); + } + + /** + * Acquires the given number of permits from this semaphore, + * blocking until all are available, + * or the thread is {@link Thread#interrupt interrupted}. + * + *

Acquires the given number of permits, if they are available, + * and returns immediately, + * reducing the number of available permits by the given amount. + * + *

If insufficient permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes one of the {@link #release() release} + * methods for this semaphore, the current thread is next to be assigned + * permits and the number of available permits satisfies this request; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread. + *
+ * + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting + * for a permit, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * Any permits that were to be assigned to this thread are instead + * assigned to the next waiting thread(s), as if + * they had been made available by a call to {@link #release()}. + * + * @param permits the number of permits to acquire + * + * @throws InterruptedException if the current thread is interrupted + * @throws IllegalArgumentException if permits less than zero. + * + * @see Thread#interrupt + */ + public void acquire(int permits) throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireSharedInterruptibly(permits); + } + + /** + * Acquires the given number of permits from this semaphore, + * blocking until all are available. + * + *

Acquires the given number of permits, if they are available, + * and returns immediately, + * reducing the number of available permits by the given amount. + * + *

If insufficient permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * some other thread invokes one of the {@link #release() release} + * methods for this semaphore, the current thread is next to be assigned + * permits and the number of available permits satisfies this request. + * + *

If the current thread + * is {@link Thread#interrupt interrupted} while waiting + * for permits then it will continue to wait and its position in the + * queue is not affected. When the + * thread does return from this method its interrupt status will be set. + * + * @param permits the number of permits to acquire + * @throws IllegalArgumentException if permits less than zero. + * + */ + public void acquireUninterruptibly(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireShared(permits); + } + + /** + * Acquires the given number of permits from this semaphore, only + * if all are available at the time of invocation. + * + *

Acquires the given number of permits, if they are available, and + * returns immediately, with the value true, + * reducing the number of available permits by the given amount. + * + *

If insufficient permits are available then this method will return + * immediately with the value false and the number of available + * permits is unchanged. + * + *

Even when this semaphore has been set to use a fair ordering + * policy, a call to tryAcquire will + * immediately acquire a permit if one is available, whether or + * not other threads are currently waiting. This + * "barging" behavior can be useful in certain + * circumstances, even though it breaks fairness. If you want to + * honor the fairness setting, then use {@link #tryAcquire(int, + * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) } + * which is almost equivalent (it also detects interruption). + * + * @param permits the number of permits to acquire + * + * @return true if the permits were acquired and false + * otherwise. + * @throws IllegalArgumentException if permits less than zero. + */ + public boolean tryAcquire(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + return sync.nonfairTryAcquireShared(permits) >= 0; + } + + /** + * Acquires the given number of permits from this semaphore, if all + * become available within the given waiting time and the + * current thread has not been {@link Thread#interrupt interrupted}. + *

Acquires the given number of permits, if they are available and + * returns immediately, with the value true, + * reducing the number of available permits by the given amount. + *

If insufficient permits are available then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of three things happens: + *

    + *
  • Some other thread invokes one of the {@link #release() release} + * methods for this semaphore, the current thread is next to be assigned + * permits and the number of available permits satisfies this request; or + *
  • Some other thread {@link Thread#interrupt interrupts} the current + * thread; or + *
  • The specified waiting time elapses. + *
+ *

If the permits are acquired then the value true is returned. + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@link Thread#interrupt interrupted} while waiting to acquire + * the permits, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * Any permits that were to be assigned to this thread, are instead + * assigned to the next waiting thread(s), as if + * they had been made available by a call to {@link #release()}. + * + *

If the specified waiting time elapses then the value false + * is returned. + * If the time is + * less than or equal to zero, the method will not wait at all. + * Any permits that were to be assigned to this thread, are instead + * assigned to the next waiting thread(s), as if + * they had been made available by a call to {@link #release()}. + * + * @param permits the number of permits to acquire + * @param timeout the maximum time to wait for the permits + * @param unit the time unit of the timeout argument. + * @return true if all permits were acquired and false + * if the waiting time elapsed before all permits were acquired. + * + * @throws InterruptedException if the current thread is interrupted + * @throws IllegalArgumentException if permits less than zero. + * + * @see Thread#interrupt + * + */ + public boolean tryAcquire(int permits, long timeout, TimeUnit unit) + throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); + } + + /** + * Releases the given number of permits, returning them to the semaphore. + *

Releases the given number of permits, increasing the number of + * available permits by that amount. + * If any threads are blocking trying to acquire permits, then the + * one that has been waiting the longest + * is selected and given the permits that were just released. + * If the number of available permits satisfies that thread's request + * then that thread is re-enabled for thread scheduling purposes; otherwise + * the thread continues to wait. If there are still permits available + * after the first thread's request has been satisfied, then those permits + * are assigned to the next waiting thread. If it is satisfied then it is + * re-enabled for thread scheduling purposes. This continues until there + * are insufficient permits to satisfy the next waiting thread, or there + * are no more waiting threads. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link Semaphore#acquire acquire}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permits the number of permits to release + * @throws IllegalArgumentException if permits less than zero. + */ + public void release(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.releaseShared(permits); + } + + /** + * Returns the current number of permits available in this semaphore. + *

This method is typically used for debugging and testing purposes. + * @return the number of permits available in this semaphore. + */ + public int availablePermits() { + return sync.getPermits(); + } + + /** + * Acquire and return all permits that are immediately available. + * @return the number of permits + */ + public int drainPermits() { + return sync.drainPermits(); + } + + /** + * Shrinks the number of available permits by the indicated + * reduction. This method can be useful in subclasses that use + * semaphores to track resources that become unavailable. This + * method differs from acquire in that it does not block + * waiting for permits to become available. + * @param reduction the number of permits to remove + * @throws IllegalArgumentException if reduction is negative + */ + protected void reducePermits(int reduction) { + if (reduction < 0) throw new IllegalArgumentException(); + sync.reducePermits(reduction); + } + + /** + * Returns true if this semaphore has fairness set true. + * @return true if this semaphore has fairness set true. + */ + public boolean isFair() { + return sync instanceof FairSync; + } + + /** + * Queries whether any threads are waiting to acquire. Note that + * because cancellations may occur at any time, a true + * return does not guarantee that any other thread will ever + * acquire. This method is designed primarily for use in + * monitoring of the system state. + * + * @return true if there may be other threads waiting to acquire + * the lock. + */ + public final boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + /** + * Returns an estimate of the number of threads waiting to + * acquire. The value is only an estimate because the number of + * threads may change dynamically while this method traverses + * internal data structures. This method is designed for use in + * monitoring of the system state, not for synchronization + * control. + * @return the estimated number of threads waiting for this lock + */ + public final int getQueueLength() { + return sync.getQueueLength(); + } + + /** + * Returns a collection containing threads that may be waiting to + * acquire. Because the actual set of threads may change + * dynamically while constructing this result, the returned + * collection is only a best-effort estimate. The elements of the + * returned collection are in no particular order. This method is + * designed to facilitate construction of subclasses that provide + * more extensive monitoring facilities. + * @return the collection of threads + */ + protected Collection getQueuedThreads() { + return sync.getQueuedThreads(); + } + + /** + * Returns a string identifying this semaphore, as well as its state. + * The state, in brackets, includes the String + * "Permits =" followed by the number of permits. + * @return a string identifying this semaphore, as well as its + * state + */ + public String toString() { + return super.toString() + "[Permits = " + sync.getPermits() + "]"; + } + +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Semaphore.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/SynchronousQueue.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/SynchronousQueue.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/SynchronousQueue.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/SynchronousQueue.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,685 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.locks.*; +import java.util.*; + +/** + * A {@linkplain BlockingQueue blocking queue} in which each + * put must wait for a take, and vice versa. A + * synchronous queue does not have any internal capacity, not even a + * capacity of one. You cannot peek at a synchronous queue + * because an element is only present when you try to take it; you + * cannot add an element (using any method) unless another thread is + * trying to remove it; you cannot iterate as there is nothing to + * iterate. The head of the queue is the element that the + * first queued thread is trying to add to the queue; if there are no + * queued threads then no element is being added and the head is + * null. For purposes of other Collection methods + * (for example contains), a SynchronousQueue acts + * as an empty collection. This queue does not permit null + * elements. + * + *

Synchronous queues are similar to rendezvous channels used in + * CSP and Ada. They are well suited for handoff designs, in which an + * object running in one thread must sync up with an object running + * in another thread in order to hand it some information, event, or + * task. + * + *

This class supports an optional fairness policy for ordering + * waiting producer and consumer threads. By default, this ordering + * is not guaranteed. However, a queue constructed with fairness set + * to true grants threads access in FIFO order. Fairness + * generally decreases throughput but reduces variability and avoids + * starvation. + * + *

This class implements all of the optional methods + * of the {@link Collection} and {@link Iterator} interfaces. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.5 + * @author Doug Lea + * @param the type of elements held in this collection + */ +public class SynchronousQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + private static final long serialVersionUID = -3223113410248163686L; + + /* + This implementation divides actions into two cases for puts: + + * An arriving producer that does not already have a waiting consumer + creates a node holding item, and then waits for a consumer to take it. + * An arriving producer that does already have a waiting consumer fills + the slot node created by the consumer, and notifies it to continue. + + And symmetrically, two for takes: + + * An arriving consumer that does not already have a waiting producer + creates an empty slot node, and then waits for a producer to fill it. + * An arriving consumer that does already have a waiting producer takes + item from the node created by the producer, and notifies it to continue. + + When a put or take waiting for the actions of its counterpart + aborts due to interruption or timeout, it marks the node + it created as "CANCELLED", which causes its counterpart to retry + the entire put or take sequence. + + This requires keeping two simple queues, waitingProducers and + waitingConsumers. Each of these can be FIFO (preserves fairness) + or LIFO (improves throughput). + */ + + /** Lock protecting both wait queues */ + private final ReentrantLock qlock; + /** Queue holding waiting puts */ + private final WaitQueue waitingProducers; + /** Queue holding waiting takes */ + private final WaitQueue waitingConsumers; + + /** + * Creates a SynchronousQueue with nonfair access policy. + */ + public SynchronousQueue() { + this(false); + } + + /** + * Creates a SynchronousQueue with specified fairness policy. + * @param fair if true, threads contend in FIFO order for access; + * otherwise the order is unspecified. + */ + public SynchronousQueue(boolean fair) { + if (fair) { + qlock = new ReentrantLock(true); + waitingProducers = new FifoWaitQueue(); + waitingConsumers = new FifoWaitQueue(); + } + else { + qlock = new ReentrantLock(); + waitingProducers = new LifoWaitQueue(); + waitingConsumers = new LifoWaitQueue(); + } + } + + /** + * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below. + * These queues have all transient fields, but are serializable + * in order to recover fairness settings when deserialized. + */ + static abstract class WaitQueue implements java.io.Serializable { + /** Create, add, and return node for x */ + abstract Node enq(Object x); + /** Remove and return node, or null if empty */ + abstract Node deq(); + } + + /** + * FIFO queue to hold waiting puts/takes. + */ + static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable { + private static final long serialVersionUID = -3623113410248163686L; + private transient Node head; + private transient Node last; + + Node enq(Object x) { + Node p = new Node(x); + if (last == null) + last = head = p; + else + last = last.next = p; + return p; + } + + Node deq() { + Node p = head; + if (p != null) { + if ((head = p.next) == null) + last = null; + p.next = null; + } + return p; + } + } + + /** + * LIFO queue to hold waiting puts/takes. + */ + static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable { + private static final long serialVersionUID = -3633113410248163686L; + private transient Node head; + + Node enq(Object x) { + return head = new Node(x, head); + } + + Node deq() { + Node p = head; + if (p != null) { + head = p.next; + p.next = null; + } + return p; + } + } + + /** + * Nodes each maintain an item and handle waits and signals for + * getting and setting it. The class extends + * AbstractQueuedSynchronizer to manage blocking, using AQS state + * 0 for waiting, 1 for ack, -1 for cancelled. + */ + static final class Node extends AbstractQueuedSynchronizer { + /** Synchronization state value representing that node acked */ + private static final int ACK = 1; + /** Synchronization state value representing that node cancelled */ + private static final int CANCEL = -1; + + /** The item being transferred */ + Object item; + /** Next node in wait queue */ + Node next; + + /** Creates a node with initial item */ + Node(Object x) { item = x; } + + /** Creates a node with initial item and next */ + Node(Object x, Node n) { item = x; next = n; } + + /** + * Implements AQS base acquire to succeed if not in WAITING state + */ + protected boolean tryAcquire(int ignore) { + return getState() != 0; + } + + /** + * Implements AQS base release to signal if state changed + */ + protected boolean tryRelease(int newState) { + return compareAndSetState(0, newState); + } + + /** + * Takes item and nulls out field (for sake of GC) + */ + private Object extract() { + Object x = item; + item = null; + return x; + } + + /** + * Tries to cancel on interrupt; if so rethrowing, + * else setting interrupt state + */ + private void checkCancellationOnInterrupt(InterruptedException ie) + throws InterruptedException { + if (release(CANCEL)) + throw ie; + Thread.currentThread().interrupt(); + } + + /** + * Fills in the slot created by the consumer and signal consumer to + * continue. + */ + boolean setItem(Object x) { + item = x; // can place in slot even if cancelled + return release(ACK); + } + + /** + * Removes item from slot created by producer and signal producer + * to continue. + */ + Object getItem() { + return (release(ACK))? extract() : null; + } + + /** + * Waits for a consumer to take item placed by producer. + */ + void waitForTake() throws InterruptedException { + try { + acquireInterruptibly(0); + } catch (InterruptedException ie) { + checkCancellationOnInterrupt(ie); + } + } + + /** + * Waits for a producer to put item placed by consumer. + */ + Object waitForPut() throws InterruptedException { + try { + acquireInterruptibly(0); + } catch (InterruptedException ie) { + checkCancellationOnInterrupt(ie); + } + return extract(); + } + + /** + * Waits for a consumer to take item placed by producer or time out. + */ + boolean waitForTake(long nanos) throws InterruptedException { + try { + if (!tryAcquireNanos(0, nanos) && + release(CANCEL)) + return false; + } catch (InterruptedException ie) { + checkCancellationOnInterrupt(ie); + } + return true; + } + + /** + * Waits for a producer to put item placed by consumer, or time out. + */ + Object waitForPut(long nanos) throws InterruptedException { + try { + if (!tryAcquireNanos(0, nanos) && + release(CANCEL)) + return null; + } catch (InterruptedException ie) { + checkCancellationOnInterrupt(ie); + } + return extract(); + } + } + + /** + * Adds the specified element to this queue, waiting if necessary for + * another thread to receive it. + * @param o the element to add + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + public void put(E o) throws InterruptedException { + if (o == null) throw new NullPointerException(); + final ReentrantLock qlock = this.qlock; + + for (;;) { + Node node; + boolean mustWait; + if (Thread.interrupted()) throw new InterruptedException(); + qlock.lock(); + try { + node = waitingConsumers.deq(); + if ( (mustWait = (node == null)) ) + node = waitingProducers.enq(o); + } finally { + qlock.unlock(); + } + + if (mustWait) { + node.waitForTake(); + return; + } + + else if (node.setItem(o)) + return; + + // else consumer cancelled, so retry + } + } + + /** + * Inserts the specified element into this queue, waiting if necessary + * up to the specified wait time for another thread to receive it. + * @param o the element to add + * @param timeout how long to wait before giving up, in units of + * unit + * @param unit a TimeUnit determining how to interpret the + * timeout parameter + * @return true if successful, or false if + * the specified waiting time elapses before a consumer appears. + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { + if (o == null) throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + final ReentrantLock qlock = this.qlock; + for (;;) { + Node node; + boolean mustWait; + if (Thread.interrupted()) throw new InterruptedException(); + qlock.lock(); + try { + node = waitingConsumers.deq(); + if ( (mustWait = (node == null)) ) + node = waitingProducers.enq(o); + } finally { + qlock.unlock(); + } + + if (mustWait) + return node.waitForTake(nanos); + + else if (node.setItem(o)) + return true; + + // else consumer cancelled, so retry + } + } + + /** + * Retrieves and removes the head of this queue, waiting if necessary + * for another thread to insert it. + * @throws InterruptedException if interrupted while waiting. + * @return the head of this queue + */ + public E take() throws InterruptedException { + final ReentrantLock qlock = this.qlock; + for (;;) { + Node node; + boolean mustWait; + + if (Thread.interrupted()) throw new InterruptedException(); + qlock.lock(); + try { + node = waitingProducers.deq(); + if ( (mustWait = (node == null)) ) + node = waitingConsumers.enq(null); + } finally { + qlock.unlock(); + } + + if (mustWait) { + Object x = node.waitForPut(); + return (E)x; + } + else { + Object x = node.getItem(); + if (x != null) + return (E)x; + // else cancelled, so retry + } + } + } + + /** + * Retrieves and removes the head of this queue, waiting + * if necessary up to the specified wait time, for another thread + * to insert it. + * @param timeout how long to wait before giving up, in units of + * unit + * @param unit a TimeUnit determining how to interpret the + * timeout parameter + * @return the head of this queue, or null if the + * specified waiting time elapses before an element is present. + * @throws InterruptedException if interrupted while waiting. + */ + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock qlock = this.qlock; + + for (;;) { + Node node; + boolean mustWait; + + if (Thread.interrupted()) throw new InterruptedException(); + qlock.lock(); + try { + node = waitingProducers.deq(); + if ( (mustWait = (node == null)) ) + node = waitingConsumers.enq(null); + } finally { + qlock.unlock(); + } + + if (mustWait) { + Object x = node.waitForPut(nanos); + return (E)x; + } + else { + Object x = node.getItem(); + if (x != null) + return (E)x; + // else cancelled, so retry + } + } + } + + // Untimed nonblocking versions + + /** + * Inserts the specified element into this queue, if another thread is + * waiting to receive it. + * + * @param o the element to add. + * @return true if it was possible to add the element to + * this queue, else false + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E o) { + if (o == null) throw new NullPointerException(); + final ReentrantLock qlock = this.qlock; + + for (;;) { + Node node; + qlock.lock(); + try { + node = waitingConsumers.deq(); + } finally { + qlock.unlock(); + } + if (node == null) + return false; + + else if (node.setItem(o)) + return true; + // else retry + } + } + + /** + * Retrieves and removes the head of this queue, if another thread + * is currently making an element available. + * + * @return the head of this queue, or null if no + * element is available. + */ + public E poll() { + final ReentrantLock qlock = this.qlock; + for (;;) { + Node node; + qlock.lock(); + try { + node = waitingProducers.deq(); + } finally { + qlock.unlock(); + } + if (node == null) + return null; + + else { + Object x = node.getItem(); + if (x != null) + return (E)x; + // else retry + } + } + } + + /** + * Always returns true. + * A SynchronousQueue has no internal capacity. + * @return true + */ + public boolean isEmpty() { + return true; + } + + /** + * Always returns zero. + * A SynchronousQueue has no internal capacity. + * @return zero. + */ + public int size() { + return 0; + } + + /** + * Always returns zero. + * A SynchronousQueue has no internal capacity. + * @return zero. + */ + public int remainingCapacity() { + return 0; + } + + /** + * Does nothing. + * A SynchronousQueue has no internal capacity. + */ + public void clear() {} + + /** + * Always returns false. + * A SynchronousQueue has no internal capacity. + * @param o the element + * @return false + */ + public boolean contains(Object o) { + return false; + } + + /** + * Always returns false. + * A SynchronousQueue has no internal capacity. + * + * @param o the element to remove + * @return false + */ + public boolean remove(Object o) { + return false; + } + + /** + * Returns false unless given collection is empty. + * A SynchronousQueue has no internal capacity. + * @param c the collection + * @return false unless given collection is empty + */ + public boolean containsAll(Collection c) { + return c.isEmpty(); + } + + /** + * Always returns false. + * A SynchronousQueue has no internal capacity. + * @param c the collection + * @return false + */ + public boolean removeAll(Collection c) { + return false; + } + + /** + * Always returns false. + * A SynchronousQueue has no internal capacity. + * @param c the collection + * @return false + */ + public boolean retainAll(Collection c) { + return false; + } + + /** + * Always returns null. + * A SynchronousQueue does not return elements + * unless actively waited on. + * @return null + */ + public E peek() { + return null; + } + + + static class EmptyIterator implements Iterator { + public boolean hasNext() { + return false; + } + public E next() { + throw new NoSuchElementException(); + } + public void remove() { + throw new IllegalStateException(); + } + } + + /** + * Returns an empty iterator in which hasNext always returns + * false. + * + * @return an empty iterator + */ + public Iterator iterator() { + return new EmptyIterator(); + } + + + /** + * Returns a zero-length array. + * @return a zero-length array + */ + public Object[] toArray() { + return new Object[0]; + } + + /** + * Sets the zeroeth element of the specified array to null + * (if the array has non-zero length) and returns it. + * @param a the array + * @return the specified array + */ + public T[] toArray(T[] a) { + if (a.length > 0) + a[0] = null; + return a; + } + + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + int n = 0; + E e; + while ( (e = poll()) != null) { + c.add(e); + ++n; + } + return n; + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + int n = 0; + E e; + while (n < maxElements && (e = poll()) != null) { + c.add(e); + ++n; + } + return n; + } +} + + + + + Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/SynchronousQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadFactory.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadFactory.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadFactory.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadFactory.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,40 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * An object that creates new threads on demand. Using thread factories + * removes hardwiring of calls to {@link Thread#Thread(Runnable) new Thread}, + * enabling applications to use special thread subclasses, priorities, etc. + * + *

+ * The simplest implementation of this interface is just: + *

+ * class SimpleThreadFactory implements ThreadFactory {
+ *   public Thread newThread(Runnable r) {
+ *     return new Thread(r);
+ *   }
+ * }
+ * 
+ * + * The {@link Executors#defaultThreadFactory} method provides a more + * useful simple implementation, that sets the created thread context + * to known values before returning it. + * @since 1.5 + * @author Doug Lea + */ +public interface ThreadFactory { + + /** + * Constructs a new Thread. Implementations may also initialize + * priority, name, daemon status, ThreadGroup, etc. + * + * @param r a runnable to be executed by new thread instance + * @return constructed thread + */ + Thread newThread(Runnable r); +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/ThreadFactory.java ------------------------------------------------------------------------------ svn:eol-style = native