Return-Path: Delivered-To: apmail-incubator-harmony-commits-archive@www.apache.org Received: (qmail 99826 invoked from network); 11 Sep 2006 00:00:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 11 Sep 2006 00:00:13 -0000 Received: (qmail 75317 invoked by uid 500); 11 Sep 2006 00:00:11 -0000 Delivered-To: apmail-incubator-harmony-commits-archive@incubator.apache.org Received: (qmail 75184 invoked by uid 500); 11 Sep 2006 00:00:10 -0000 Mailing-List: contact harmony-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: harmony-dev@incubator.apache.org Delivered-To: mailing list harmony-commits@incubator.apache.org Received: (qmail 75044 invoked by uid 99); 11 Sep 2006 00:00:10 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Sep 2006 17:00:10 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Sep 2006 17:00:03 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 9C1E01A9823; Sun, 10 Sep 2006 16:59:43 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r442038 [4/19] - in /incubator/harmony/standard/classlib/trunk/modules/concurrent: ./ src/ src/main/ src/main/java/ src/main/java/java/ src/main/java/java/util/ src/main/java/java/util/concurrent/ src/main/java/java/util/concurrent/atomic/ ... Date: Sun, 10 Sep 2006 23:59:35 -0000 To: harmony-commits@incubator.apache.org From: ndbeyer@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060910235943.9C1E01A9823@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,659 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.security.AccessControlException; + +/** + * Factory and utility methods for {@link Executor}, {@link + * ExecutorService}, {@link ScheduledExecutorService}, {@link + * ThreadFactory}, and {@link Callable} classes defined in this + * package. This class supports the following kinds of methods: + * + *
    + *
  • Methods that create and return an {@link ExecutorService} + * set up with commonly useful configuration settings. + *
  • Methods that create and return a {@link ScheduledExecutorService} + * set up with commonly useful configuration settings. + *
  • Methods that create and return a "wrapped" ExecutorService, that + * disables reconfiguration by making implementation-specific methods + * inaccessible. + *
  • Methods that create and return a {@link ThreadFactory} + * that sets newly created threads to a known state. + *
  • Methods that create and return a {@link Callable} + * out of other closure-like forms, so they can be used + * in execution methods requiring Callable. + *
+ * + * @since 1.5 + * @author Doug Lea + */ +public class Executors { + + /** + * Creates a thread pool that reuses a fixed set of threads + * operating off a shared unbounded queue. If any thread + * terminates due to a failure during execution prior to shutdown, + * a new one will take its place if needed to execute subsequent + * tasks. + * + * @param nThreads the number of threads in the pool + * @return the newly created thread pool + */ + public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + + /** + * Creates a thread pool that reuses a fixed set of threads + * operating off a shared unbounded queue, using the provided + * ThreadFactory to create new threads when needed. + * + * @param nThreads the number of threads in the pool + * @param threadFactory the factory to use when creating new threads + * @return the newly created thread pool + */ + public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + } + + /** + * Creates an Executor that uses a single worker thread operating + * off an unbounded queue. (Note however that if this single + * thread terminates due to a failure during execution prior to + * shutdown, a new one will take its place if needed to execute + * subsequent tasks.) Tasks are guaranteed to execute + * sequentially, and no more than one task will be active at any + * given time. Unlike the otherwise equivalent + * newFixedThreadPool(1) the returned executor is + * guaranteed not to be reconfigurable to use additional threads. + * + * @return the newly created single-threaded Executor + */ + public static ExecutorService newSingleThreadExecutor() { + return new DelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); + } + + /** + * Creates an Executor that uses a single worker thread operating + * off an unbounded queue, and uses the provided ThreadFactory to + * create a new thread when needed. Unlike the otherwise + * equivalent newFixedThreadPool(1, threadFactory) the returned executor + * is guaranteed not to be reconfigurable to use additional + * threads. + * + * @param threadFactory the factory to use when creating new + * threads + * + * @return the newly created single-threaded Executor + */ + public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { + return new DelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory)); + } + + /** + * Creates a thread pool that creates new threads as needed, but + * will reuse previously constructed threads when they are + * available. These pools will typically improve the performance + * of programs that execute many short-lived asynchronous tasks. + * Calls to execute will reuse previously constructed + * threads if available. If no existing thread is available, a new + * thread will be created and added to the pool. Threads that have + * not been used for sixty seconds are terminated and removed from + * the cache. Thus, a pool that remains idle for long enough will + * not consume any resources. Note that pools with similar + * properties but different details (for example, timeout parameters) + * may be created using {@link ThreadPoolExecutor} constructors. + * + * @return the newly created thread pool + */ + public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + } + + /** + * Creates a thread pool that creates new threads as needed, but + * will reuse previously constructed threads when they are + * available, and uses the provided + * ThreadFactory to create new threads when needed. + * @param threadFactory the factory to use when creating new threads + * @return the newly created thread pool + */ + public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); + } + + /** + * Creates a single-threaded executor that can schedule commands + * to run after a given delay, or to execute periodically. + * (Note however that if this single + * thread terminates due to a failure during execution prior to + * shutdown, a new one will take its place if needed to execute + * subsequent tasks.) Tasks are guaranteed to execute + * sequentially, and no more than one task will be active at any + * given time. Unlike the otherwise equivalent + * newScheduledThreadPool(1) the returned executor is + * guaranteed not to be reconfigurable to use additional threads. + * @return the newly created scheduled executor + */ + public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1)); + } + + /** + * Creates a single-threaded executor that can schedule commands + * to run after a given delay, or to execute periodically. (Note + * however that if this single thread terminates due to a failure + * during execution prior to shutdown, a new one will take its + * place if needed to execute subsequent tasks.) Tasks are + * guaranteed to execute sequentially, and no more than one task + * will be active at any given time. Unlike the otherwise + * equivalent newScheduledThreadPool(1, threadFactory) + * the returned executor is guaranteed not to be reconfigurable to + * use additional threads. + * @param threadFactory the factory to use when creating new + * threads + * @return a newly created scheduled executor + */ + public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1, threadFactory)); + } + + /** + * Creates a thread pool that can schedule commands to run after a + * given delay, or to execute periodically. + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @return a newly created scheduled thread pool + */ + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { + return new ScheduledThreadPoolExecutor(corePoolSize); + } + + /** + * Creates a thread pool that can schedule commands to run after a + * given delay, or to execute periodically. + * @param corePoolSize the number of threads to keep in the pool, + * even if they are idle. + * @param threadFactory the factory to use when the executor + * creates a new thread. + * @return a newly created scheduled thread pool + */ + public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize, ThreadFactory threadFactory) { + return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } + + + /** + * Returns an object that delegates all defined {@link + * ExecutorService} methods to the given executor, but not any + * other methods that might otherwise be accessible using + * casts. This provides a way to safely "freeze" configuration and + * disallow tuning of a given concrete implementation. + * @param executor the underlying implementation + * @return an ExecutorService instance + * @throws NullPointerException if executor null + */ + public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedExecutorService(executor); + } + + /** + * Returns an object that delegates all defined {@link + * ScheduledExecutorService} methods to the given executor, but + * not any other methods that might otherwise be accessible using + * casts. This provides a way to safely "freeze" configuration and + * disallow tuning of a given concrete implementation. + * @param executor the underlying implementation + * @return a ScheduledExecutorService instance + * @throws NullPointerException if executor null + */ + public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedScheduledExecutorService(executor); + } + + /** + * Returns a default thread factory used to create new threads. + * This factory creates all new threads used by an Executor in the + * same {@link ThreadGroup}. If there is a {@link + * java.lang.SecurityManager}, it uses the group of {@link + * System#getSecurityManager}, else the group of the thread + * invoking this defaultThreadFactory method. Each new + * thread is created as a non-daemon thread with priority + * Thread.NORM_PRIORITY. New threads have names + * accessible via {@link Thread#getName} of + * pool-N-thread-M, where N is the sequence + * number of this factory, and M is the sequence number + * of the thread created by this factory. + * @return a thread factory + */ + public static ThreadFactory defaultThreadFactory() { + return new DefaultThreadFactory(); + } + + /** + * Returns a thread factory used to create new threads that + * have the same permissions as the current thread. + * This factory creates threads with the same settings as {@link + * Executors#defaultThreadFactory}, additionally setting the + * AccessControlContext and contextClassLoader of new threads to + * be the same as the thread invoking this + * privilegedThreadFactory method. A new + * privilegedThreadFactory can be created within an + * {@link AccessController#doPrivileged} action setting the + * current thread's access control context to create threads with + * the selected permission settings holding within that action. + * + *

Note that while tasks running within such threads will have + * the same access control and class loader settings as the + * current thread, they need not have the same {@link + * java.lang.ThreadLocal} or {@link + * java.lang.InheritableThreadLocal} values. If necessary, + * particular values of thread locals can be set or reset before + * any task runs in {@link ThreadPoolExecutor} subclasses using + * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is + * necessary to initialize worker threads to have the same + * InheritableThreadLocal settings as some other designated + * thread, you can create a custom ThreadFactory in which that + * thread waits for and services requests to create others that + * will inherit its values. + * + * @return a thread factory + * @throws AccessControlException if the current access control + * context does not have permission to both get and set context + * class loader. + */ + public static ThreadFactory privilegedThreadFactory() { + return new PrivilegedThreadFactory(); + } + + /** + * Returns a {@link Callable} object that, when + * called, runs the given task and returns the given result. This + * can be useful when applying methods requiring a + * Callable to an otherwise resultless action. + * @param task the task to run + * @param result the result to return + * @throws NullPointerException if task null + * @return a callable object + */ + public static Callable callable(Runnable task, T result) { + if (task == null) + throw new NullPointerException(); + return new RunnableAdapter(task, result); + } + + /** + * Returns a {@link Callable} object that, when + * called, runs the given task and returns null. + * @param task the task to run + * @return a callable object + * @throws NullPointerException if task null + */ + public static Callable callable(Runnable task) { + if (task == null) + throw new NullPointerException(); + return new RunnableAdapter(task, null); + } + + /** + * Returns a {@link Callable} object that, when + * called, runs the given privileged action and returns its result. + * @param action the privileged action to run + * @return a callable object + * @throws NullPointerException if action null + */ + public static Callable callable(PrivilegedAction action) { + if (action == null) + throw new NullPointerException(); + return new PrivilegedActionAdapter(action); + } + + /** + * Returns a {@link Callable} object that, when + * called, runs the given privileged exception action and returns + * its result. + * @param action the privileged exception action to run + * @return a callable object + * @throws NullPointerException if action null + */ + public static Callable callable(PrivilegedExceptionAction action) { + if (action == null) + throw new NullPointerException(); + return new PrivilegedExceptionActionAdapter(action); + } + + /** + * Returns a {@link Callable} object that will, when + * called, execute the given callable under the current + * access control context. This method should normally be + * invoked within an {@link AccessController#doPrivileged} action + * to create callables that will, if possible, execute under the + * selected permission settings holding within that action; or if + * not possible, throw an associated {@link + * AccessControlException}. + * @param callable the underlying task + * @return a callable object + * @throws NullPointerException if callable null + * + */ + public static Callable privilegedCallable(Callable callable) { + if (callable == null) + throw new NullPointerException(); + return new PrivilegedCallable(callable); + } + + /** + * Returns a {@link Callable} object that will, when + * called, execute the given callable under the current + * access control context, with the current context class loader + * as the context class loader. This method should normally be + * invoked within an {@link AccessController#doPrivileged} action + * to create callables that will, if possible, execute under the + * selected permission settings holding within that action; or if + * not possible, throw an associated {@link + * AccessControlException}. + * @param callable the underlying task + * + * @return a callable object + * @throws NullPointerException if callable null + * @throws AccessControlException if the current access control + * context does not have permission to both set and get context + * class loader. + */ + public static Callable privilegedCallableUsingCurrentClassLoader(Callable callable) { + if (callable == null) + throw new NullPointerException(); + return new PrivilegedCallableUsingCurrentClassLoader(callable); + } + + // Non-public classes supporting the public methods + + /** + * A callable that runs given task and returns given result + */ + static final class RunnableAdapter implements Callable { + final Runnable task; + final T result; + RunnableAdapter(Runnable task, T result) { + this.task = task; + this.result = result; + } + public T call() { + task.run(); + return result; + } + } + + /** + * A callable that runs given privileged action and returns its result + */ + static final class PrivilegedActionAdapter implements Callable { + PrivilegedActionAdapter(PrivilegedAction action) { + this.action = action; + } + public Object call () { + return action.run(); + } + private final PrivilegedAction action; + } + + /** + * A callable that runs given privileged exception action and returns its result + */ + static final class PrivilegedExceptionActionAdapter implements Callable { + PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) { + this.action = action; + } + public Object call () throws Exception { + return action.run(); + } + private final PrivilegedExceptionAction action; + } + + + /** + * A callable that runs under established access control settings + */ + static final class PrivilegedCallable implements Callable { + private final AccessControlContext acc; + private final Callable task; + private T result; + private Exception exception; + PrivilegedCallable(Callable task) { + this.task = task; + this.acc = AccessController.getContext(); + } + + public T call() throws Exception { + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + try { + result = task.call(); + } catch(Exception ex) { + exception = ex; + } + return null; + } + }, acc); + if (exception != null) + throw exception; + else + return result; + } + } + + /** + * A callable that runs under established access control settings and + * current ClassLoader + */ + static final class PrivilegedCallableUsingCurrentClassLoader implements Callable { + private final ClassLoader ccl; + private final AccessControlContext acc; + private final Callable task; + private T result; + private Exception exception; + PrivilegedCallableUsingCurrentClassLoader(Callable task) { + this.task = task; + this.ccl = Thread.currentThread().getContextClassLoader(); + this.acc = AccessController.getContext(); + acc.checkPermission(new RuntimePermission("getContextClassLoader")); + acc.checkPermission(new RuntimePermission("setContextClassLoader")); + } + + public T call() throws Exception { + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + ClassLoader savedcl = null; + Thread t = Thread.currentThread(); + try { + ClassLoader cl = t.getContextClassLoader(); + if (ccl != cl) { + t.setContextClassLoader(ccl); + savedcl = cl; + } + result = task.call(); + } catch(Exception ex) { + exception = ex; + } finally { + if (savedcl != null) + t.setContextClassLoader(savedcl); + } + return null; + } + }, acc); + if (exception != null) + throw exception; + else + return result; + } + } + + /** + * The default thread factory + */ + static class DefaultThreadFactory implements ThreadFactory { + static final AtomicInteger poolNumber = new AtomicInteger(1); + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DefaultThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null)? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "pool-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + } + + /** + * Thread factory capturing access control and class loader + */ + static class PrivilegedThreadFactory extends DefaultThreadFactory { + private final ClassLoader ccl; + private final AccessControlContext acc; + + PrivilegedThreadFactory() { + super(); + this.ccl = Thread.currentThread().getContextClassLoader(); + this.acc = AccessController.getContext(); + acc.checkPermission(new RuntimePermission("setContextClassLoader")); + } + + public Thread newThread(final Runnable r) { + return super.newThread(new Runnable() { + public void run() { + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + Thread.currentThread().setContextClassLoader(ccl); + r.run(); + return null; + } + }, acc); + } + }); + } + + } + + /** + * A wrapper class that exposes only the ExecutorService methods + * of an implementation. + */ + static class DelegatedExecutorService extends AbstractExecutorService { + private final ExecutorService e; + DelegatedExecutorService(ExecutorService executor) { e = executor; } + public void execute(Runnable command) { e.execute(command); } + public void shutdown() { e.shutdown(); } + public List shutdownNow() { return e.shutdownNow(); } + public boolean isShutdown() { return e.isShutdown(); } + public boolean isTerminated() { return e.isTerminated(); } + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return e.awaitTermination(timeout, unit); + } + public Future submit(Runnable task) { + return e.submit(task); + } + public Future submit(Callable task) { + return e.submit(task); + } + public Future submit(Runnable task, T result) { + return e.submit(task, result); + } + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return e.invokeAll(tasks); + } + public List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException { + return e.invokeAll(tasks, timeout, unit); + } + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return e.invokeAny(tasks); + } + public T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return e.invokeAny(tasks, timeout, unit); + } + } + + /** + * A wrapper class that exposes only the ExecutorService and + * ScheduleExecutor methods of a ScheduledExecutorService implementation. + */ + static class DelegatedScheduledExecutorService + extends DelegatedExecutorService + implements ScheduledExecutorService { + private final ScheduledExecutorService e; + DelegatedScheduledExecutorService(ScheduledExecutorService executor) { + super(executor); + e = executor; + } + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return e.schedule(command, delay, unit); + } + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return e.schedule(callable, delay, unit); + } + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return e.scheduleAtFixedRate(command, initialDelay, period, unit); + } + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + } + + + /** Cannot instantiate. */ + private Executors() {} +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,133 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * A Future represents the result of an asynchronous + * computation. Methods are provided to check if the computation is + * complete, to wait for its completion, and to retrieve the result of + * the computation. The result can only be retrieved using method + * get when the computation has completed, blocking if + * necessary until it is ready. Cancellation is performed by the + * cancel method. Additional methods are provided to + * determine if the task completed normally or was cancelled. Once a + * computation has completed, the computation cannot be cancelled. + * If you would like to use a Future for the sake + * of cancellability but not provide a usable result, you can + * declare types of the form Future<?> and + * return null as a result of the underlying task. + * + *

+ * Sample Usage (Note that the following classes are all + * made-up.)

+ *

+ * interface ArchiveSearcher { String search(String target); }
+ * class App {
+ *   ExecutorService executor = ...
+ *   ArchiveSearcher searcher = ...
+ *   void showSearch(final String target) throws InterruptedException {
+ *     Future<String> future = executor.submit(new Callable<String>() {
+ *         public String call() { return searcher.search(target); }
+ *     });
+ *     displayOtherThings(); // do other things while searching
+ *     try {
+ *       displayText(future.get()); // use future
+ *     } catch (ExecutionException ex) { cleanup(); return; }
+ *   }
+ * }
+ * 
+ * + * The {@link FutureTask} class is an implementation of Future that + * implements Runnable, and so may be executed by an Executor. + * For example, the above construction with submit could be replaced by: + *
+ *     FutureTask<String> future =
+ *       new FutureTask<String>(new Callable<String>() {
+ *         public String call() {
+ *           return searcher.search(target);
+ *       }});
+ *     executor.execute(future);
+ * 
+ * @see FutureTask + * @see Executor + * @since 1.5 + * @author Doug Lea + * @param The result type returned by this Future's get method + */ +public interface Future { + + /** + * Attempts to cancel execution of this task. This attempt will + * fail if the task has already completed, already been cancelled, + * or could not be cancelled for some other reason. If successful, + * and this task has not started when cancel is called, + * this task should never run. If the task has already started, + * then the mayInterruptIfRunning parameter determines + * whether the thread executing this task should be interrupted in + * an attempt to stop the task. + * + * @param mayInterruptIfRunning true if the thread executing this + * task should be interrupted; otherwise, in-progress tasks are allowed + * to complete + * @return false if the task could not be cancelled, + * typically because it has already completed normally; + * true otherwise + */ + boolean cancel(boolean mayInterruptIfRunning); + + /** + * Returns true if this task was cancelled before it completed + * normally. + * + * @return true if task was cancelled before it completed + */ + boolean isCancelled(); + + /** + * Returns true if this task completed. + * + * Completion may be due to normal termination, an exception, or + * cancellation -- in all of these cases, this method will return + * true. + * + * @return true if this task completed. + */ + boolean isDone(); + + /** + * Waits if necessary for the computation to complete, and then + * retrieves its result. + * + * @return the computed result + * @throws CancellationException if the computation was cancelled + * @throws ExecutionException if the computation threw an + * exception + * @throws InterruptedException if the current thread was interrupted + * while waiting + */ + V get() throws InterruptedException, ExecutionException; + + /** + * Waits if necessary for at most the given time for the computation + * to complete, and then retrieves its result, if available. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return the computed result + * @throws CancellationException if the computation was cancelled + * @throws ExecutionException if the computation threw an + * exception + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws TimeoutException if the wait timed out + */ + V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; +} + + + Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,276 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.locks.*; + +/** + * A cancellable asynchronous computation. This class provides a base + * implementation of {@link Future}, with methods to start and cancel + * a computation, query to see if the computation is complete, and + * retrieve the result of the computation. The result can only be + * retrieved when the computation has completed; the get + * method will block if the computation has not yet completed. Once + * the computation has completed, the computation cannot be restarted + * or cancelled. + * + *

A FutureTask can be used to wrap a {@link Callable} or + * {@link java.lang.Runnable} object. Because FutureTask + * implements Runnable, a FutureTask can be + * submitted to an {@link Executor} for execution. + * + *

In addition to serving as a standalone class, this class provides + * protected functionality that may be useful when creating + * customized task classes. + * + * @since 1.5 + * @author Doug Lea + * @param The result type returned by this FutureTask's get method + */ +public class FutureTask implements Future, Runnable { + /** Synchronization control for FutureTask */ + private final Sync sync; + + /** + * Creates a FutureTask that will upon running, execute the + * given Callable. + * + * @param callable the callable task + * @throws NullPointerException if callable is null + */ + public FutureTask(Callable callable) { + if (callable == null) + throw new NullPointerException(); + sync = new Sync(callable); + } + + /** + * Creates a FutureTask that will upon running, execute the + * given Runnable, and arrange that get will return the + * given result on successful completion. + * + * @param runnable the runnable task + * @param result the result to return on successful completion. If + * you don't need a particular result, consider using + * constructions of the form: + * Future<?> f = new FutureTask<Object>(runnable, null) + * @throws NullPointerException if runnable is null + */ + public FutureTask(Runnable runnable, V result) { + sync = new Sync(Executors.callable(runnable, result)); + } + + public boolean isCancelled() { + return sync.innerIsCancelled(); + } + + public boolean isDone() { + return sync.innerIsDone(); + } + + public boolean cancel(boolean mayInterruptIfRunning) { + return sync.innerCancel(mayInterruptIfRunning); + } + + public V get() throws InterruptedException, ExecutionException { + return sync.innerGet(); + } + + public V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return sync.innerGet(unit.toNanos(timeout)); + } + + /** + * Protected method invoked when this task transitions to state + * isDone (whether normally or via cancellation). The + * default implementation does nothing. Subclasses may override + * this method to invoke completion callbacks or perform + * bookkeeping. Note that you can query status inside the + * implementation of this method to determine whether this task + * has been cancelled. + */ + protected void done() { } + + /** + * Sets the result of this Future to the given value unless + * this future has already been set or has been cancelled. + * @param v the value + */ + protected void set(V v) { + sync.innerSet(v); + } + + /** + * Causes this future to report an ExecutionException + * with the given throwable as its cause, unless this Future has + * already been set or has been cancelled. + * @param t the cause of failure. + */ + protected void setException(Throwable t) { + sync.innerSetException(t); + } + + /** + * Sets this Future to the result of computation unless + * it has been cancelled. + */ + public void run() { + sync.innerRun(); + } + + /** + * Executes the computation without setting its result, and then + * resets this Future to initial state, failing to do so if the + * computation encounters an exception or is cancelled. This is + * designed for use with tasks that intrinsically execute more + * than once. + * @return true if successfully run and reset + */ + protected boolean runAndReset() { + return sync.innerRunAndReset(); + } + + /** + * Synchronization control for FutureTask. Note that this must be + * a non-static inner class in order to invoke the protected + * done method. For clarity, all inner class support + * methods are same as outer, prefixed with "inner". + * + * Uses AQS sync state to represent run status + */ + private final class Sync extends AbstractQueuedSynchronizer { + /** State value representing that task is running */ + private static final int RUNNING = 1; + /** State value representing that task ran */ + private static final int RAN = 2; + /** State value representing that task was cancelled */ + private static final int CANCELLED = 4; + + /** The underlying callable */ + private final Callable callable; + /** The result to return from get() */ + private V result; + /** The exception to throw from get() */ + private Throwable exception; + + /** + * The thread running task. When nulled after set/cancel, this + * indicates that the results are accessible. Must be + * volatile, to serve as write barrier on completion. + */ + private volatile Thread runner; + + Sync(Callable callable) { + this.callable = callable; + } + + private boolean ranOrCancelled(int state) { + return (state & (RAN | CANCELLED)) != 0; + } + + /** + * Implements AQS base acquire to succeed if ran or cancelled + */ + protected int tryAcquireShared(int ignore) { + return innerIsDone()? 1 : -1; + } + + /** + * Implements AQS base release to always signal after setting + * final done status by nulling runner thread. + */ + protected boolean tryReleaseShared(int ignore) { + runner = null; + return true; + } + + boolean innerIsCancelled() { + return getState() == CANCELLED; + } + + boolean innerIsDone() { + return ranOrCancelled(getState()) && runner == null; + } + + V innerGet() throws InterruptedException, ExecutionException { + acquireSharedInterruptibly(0); + if (getState() == CANCELLED) + throw new CancellationException(); + if (exception != null) + throw new ExecutionException(exception); + return result; + } + + V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { + if (!tryAcquireSharedNanos(0, nanosTimeout)) + throw new TimeoutException(); + if (getState() == CANCELLED) + throw new CancellationException(); + if (exception != null) + throw new ExecutionException(exception); + return result; + } + + void innerSet(V v) { + int s = getState(); + if (ranOrCancelled(s) || !compareAndSetState(s, RAN)) + return; + result = v; + releaseShared(0); + done(); + } + + void innerSetException(Throwable t) { + int s = getState(); + if (ranOrCancelled(s) || !compareAndSetState(s, RAN)) + return; + exception = t; + result = null; + releaseShared(0); + done(); + } + + boolean innerCancel(boolean mayInterruptIfRunning) { + int s = getState(); + if (ranOrCancelled(s) || !compareAndSetState(s, CANCELLED)) + return false; + if (mayInterruptIfRunning) { + Thread r = runner; + if (r != null) + r.interrupt(); + } + releaseShared(0); + done(); + return true; + } + + void innerRun() { + if (!compareAndSetState(0, RUNNING)) + return; + try { + runner = Thread.currentThread(); + innerSet(callable.call()); + } catch(Throwable ex) { + innerSetException(ex); + } + } + + boolean innerRunAndReset() { + if (!compareAndSetState(0, RUNNING)) + return false; + try { + runner = Thread.currentThread(); + callable.call(); // don't set result + runner = null; + return compareAndSetState(RUNNING, 0); + } catch(Throwable ex) { + innerSetException(ex); + return false; + } + } + } +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,720 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; +import java.util.*; + +/** + * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on + * linked nodes. + * This queue orders elements FIFO (first-in-first-out). + * The head of the queue is that element that has been on the + * queue the longest time. + * The tail of the queue is that element that has been on the + * queue the shortest time. New elements + * are inserted at the tail of the queue, and the queue retrieval + * operations obtain elements at the head of the queue. + * Linked queues typically have higher throughput than array-based queues but + * less predictable performance in most concurrent applications. + * + *

The optional capacity bound constructor argument serves as a + * way to prevent excessive queue expansion. The capacity, if unspecified, + * is equal to {@link Integer#MAX_VALUE}. Linked nodes are + * dynamically created upon each insertion unless this would bring the + * queue above capacity. + * + *

This class implements all of the optional methods + * of the {@link Collection} and {@link Iterator} interfaces. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.5 + * @author Doug Lea + * @param the type of elements held in this collection + * + **/ +public class LinkedBlockingQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + private static final long serialVersionUID = -6903933977591709194L; + + /* + * A variant of the "two lock queue" algorithm. The putLock gates + * entry to put (and offer), and has an associated condition for + * waiting puts. Similarly for the takeLock. The "count" field + * that they both rely on is maintained as an atomic to avoid + * needing to get both locks in most cases. Also, to minimize need + * for puts to get takeLock and vice-versa, cascading notifies are + * used. When a put notices that it has enabled at least one take, + * it signals taker. That taker in turn signals others if more + * items have been entered since the signal. And symmetrically for + * takes signalling puts. Operations such as remove(Object) and + * iterators acquire both locks. + */ + + /** + * Linked list node class + */ + static class Node { + /** The item, volatile to ensure barrier separating write and read */ + volatile E item; + Node next; + Node(E x) { item = x; } + } + + /** The capacity bound, or Integer.MAX_VALUE if none */ + private final int capacity; + + /** Current number of elements */ + private final AtomicInteger count = new AtomicInteger(0); + + /** Head of linked list */ + private transient Node head; + + /** Tail of linked list */ + private transient Node last; + + /** Lock held by take, poll, etc */ + private final ReentrantLock takeLock = new ReentrantLock(); + + /** Wait queue for waiting takes */ + private final Condition notEmpty = takeLock.newCondition(); + + /** Lock held by put, offer, etc */ + private final ReentrantLock putLock = new ReentrantLock(); + + /** Wait queue for waiting puts */ + private final Condition notFull = putLock.newCondition(); + + /** + * Signal a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void signalNotEmpty() { + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * Signal a waiting put. Called only from take/poll. + */ + private void signalNotFull() { + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + notFull.signal(); + } finally { + putLock.unlock(); + } + } + + /** + * Create a node and link it at end of queue + * @param x the item + */ + private void insert(E x) { + last = last.next = new Node(x); + } + + /** + * Remove a node from head of queue, + * @return the node + */ + private E extract() { + Node first = head.next; + head = first; + E x = first.item; + first.item = null; + return x; + } + + /** + * Lock to prevent both puts and takes. + */ + private void fullyLock() { + putLock.lock(); + takeLock.lock(); + } + + /** + * Unlock to allow both puts and takes. + */ + private void fullyUnlock() { + takeLock.unlock(); + putLock.unlock(); + } + + + /** + * Creates a LinkedBlockingQueue with a capacity of + * {@link Integer#MAX_VALUE}. + */ + public LinkedBlockingQueue() { + this(Integer.MAX_VALUE); + } + + /** + * Creates a LinkedBlockingQueue with the given (fixed) capacity. + * + * @param capacity the capacity of this queue. + * @throws IllegalArgumentException if capacity is not greater + * than zero. + */ + public LinkedBlockingQueue(int capacity) { + if (capacity <= 0) throw new IllegalArgumentException(); + this.capacity = capacity; + last = head = new Node(null); + } + + /** + * Creates a LinkedBlockingQueue with a capacity of + * {@link Integer#MAX_VALUE}, initially containing the elements of the + * given collection, + * added in traversal order of the collection's iterator. + * @param c the collection of elements to initially contain + * @throws NullPointerException if c or any element within it + * is null + */ + public LinkedBlockingQueue(Collection c) { + this(Integer.MAX_VALUE); + for (Iterator it = c.iterator(); it.hasNext();) + add(it.next()); + } + + + // this doc comment is overridden to remove the reference to collections + // greater in size than Integer.MAX_VALUE + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue. + */ + public int size() { + return count.get(); + } + + // this doc comment is a modified copy of the inherited doc comment, + // without the reference to unlimited queues. + /** + * Returns the number of elements that this queue can ideally (in + * the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this queue + * less the current size of this queue. + *

Note that you cannot always tell if + * an attempt to add an element will succeed by + * inspecting remainingCapacity because it may be the + * case that a waiting consumer is ready to take an + * element out of an otherwise full queue. + */ + public int remainingCapacity() { + return capacity - count.get(); + } + + /** + * Adds the specified element to the tail of this queue, waiting if + * necessary for space to become available. + * @param o the element to add + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + public void put(E o) throws InterruptedException { + if (o == null) throw new NullPointerException(); + // Note: convention in all put/take/etc is to preset + // local var holding count negative to indicate failure unless set. + int c = -1; + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + /* + * Note that count is used in wait guard even though it is + * not protected by lock. This works because count can + * only decrease at this point (all other puts are shut + * out by lock), and we (or some other waiting put) are + * signalled if it ever changes from + * capacity. Similarly for all other uses of count in + * other wait guards. + */ + try { + while (count.get() == capacity) + notFull.await(); + } catch (InterruptedException ie) { + notFull.signal(); // propagate to a non-interrupted thread + throw ie; + } + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary up to the specified wait time for space to become available. + * @param o the element to add + * @param timeout how long to wait before giving up, in units of + * unit + * @param unit a TimeUnit determining how to interpret the + * timeout parameter + * @return true if successful, or false if + * the specified waiting time elapses before space is available. + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o, long timeout, TimeUnit unit) + throws InterruptedException { + + if (o == null) throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + int c = -1; + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + for (;;) { + if (count.get() < capacity) { + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + break; + } + if (nanos <= 0) + return false; + try { + nanos = notFull.awaitNanos(nanos); + } catch (InterruptedException ie) { + notFull.signal(); // propagate to a non-interrupted thread + throw ie; + } + } + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return true; + } + + /** + * Inserts the specified element at the tail of this queue if possible, + * returning immediately if this queue is full. + * + * @param o the element to add. + * @return true if it was possible to add the element to + * this queue, else false + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E o) { + if (o == null) throw new NullPointerException(); + final AtomicInteger count = this.count; + if (count.get() == capacity) + return false; + int c = -1; + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + if (count.get() < capacity) { + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return c >= 0; + } + + + public E take() throws InterruptedException { + E x; + int c = -1; + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + try { + while (count.get() == 0) + notEmpty.await(); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + E x = null; + int c = -1; + long nanos = unit.toNanos(timeout); + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + for (;;) { + if (count.get() > 0) { + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + break; + } + if (nanos <= 0) + return null; + try { + nanos = notEmpty.awaitNanos(nanos); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + } + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E poll() { + final AtomicInteger count = this.count; + if (count.get() == 0) + return null; + E x = null; + int c = -1; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + if (count.get() > 0) { + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + + public E peek() { + if (count.get() == 0) + return null; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + Node first = head.next; + if (first == null) + return null; + else + return first.item; + } finally { + takeLock.unlock(); + } + } + + public boolean remove(Object o) { + if (o == null) return false; + boolean removed = false; + fullyLock(); + try { + Node trail = head; + Node p = head.next; + while (p != null) { + if (o.equals(p.item)) { + removed = true; + break; + } + trail = p; + p = p.next; + } + if (removed) { + p.item = null; + trail.next = p.next; + if (count.getAndDecrement() == capacity) + notFull.signalAll(); + } + } finally { + fullyUnlock(); + } + return removed; + } + + public Object[] toArray() { + fullyLock(); + try { + int size = count.get(); + Object[] a = new Object[size]; + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = p.item; + return a; + } finally { + fullyUnlock(); + } + } + + public T[] toArray(T[] a) { + fullyLock(); + try { + int size = count.get(); + if (a.length < size) + a = (T[])java.lang.reflect.Array.newInstance + (a.getClass().getComponentType(), size); + + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = (T)p.item; + return a; + } finally { + fullyUnlock(); + } + } + + public String toString() { + fullyLock(); + try { + return super.toString(); + } finally { + fullyUnlock(); + } + } + + public void clear() { + fullyLock(); + try { + head.next = null; + if (count.getAndSet(0) == capacity) + notFull.signalAll(); + } finally { + fullyUnlock(); + } + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + Node first; + fullyLock(); + try { + first = head.next; + head.next = null; + if (count.getAndSet(0) == capacity) + notFull.signalAll(); + } finally { + fullyUnlock(); + } + // Transfer the elements outside of locks + int n = 0; + for (Node p = first; p != null; p = p.next) { + c.add(p.item); + p.item = null; + ++n; + } + return n; + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + fullyLock(); + try { + int n = 0; + Node p = head.next; + while (p != null && n < maxElements) { + c.add(p.item); + p.item = null; + p = p.next; + ++n; + } + if (n != 0) { + head.next = p; + if (count.getAndAdd(-n) == capacity) + notFull.signalAll(); + } + return n; + } finally { + fullyUnlock(); + } + } + + /** + * Returns an iterator over the elements in this queue in proper sequence. + * The returned Iterator is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this queue in proper sequence. + */ + public Iterator iterator() { + return new Itr(); + } + + private class Itr implements Iterator { + /* + * Basic weak-consistent iterator. At all times hold the next + * item to hand out so that if hasNext() reports true, we will + * still have it to return even if lost race with a take etc. + */ + private Node current; + private Node lastRet; + private E currentElement; + + Itr() { + final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + current = head.next; + if (current != null) + currentElement = current.item; + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + + public boolean hasNext() { + return current != null; + } + + public E next() { + final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + if (current == null) + throw new NoSuchElementException(); + E x = currentElement; + lastRet = current; + current = current.next; + if (current != null) + currentElement = current.item; + return x; + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + + public void remove() { + if (lastRet == null) + throw new IllegalStateException(); + final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + Node node = lastRet; + lastRet = null; + Node trail = head; + Node p = head.next; + while (p != null && p != node) { + trail = p; + p = p.next; + } + if (p == node) { + p.item = null; + trail.next = p.next; + int c = count.getAndDecrement(); + if (c == capacity) + notFull.signalAll(); + } + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + } + + /** + * Save the state to a stream (that is, serialize it). + * + * @serialData The capacity is emitted (int), followed by all of + * its elements (each an Object) in the proper order, + * followed by a null + * @param s the stream + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + + fullyLock(); + try { + // Write out any hidden stuff, plus capacity + s.defaultWriteObject(); + + // Write out all elements in the proper order. + for (Node p = head.next; p != null; p = p.next) + s.writeObject(p.item); + + // Use trailing null as sentinel + s.writeObject(null); + } finally { + fullyUnlock(); + } + } + + /** + * Reconstitute this queue instance from a stream (that is, + * deserialize it). + * @param s the stream + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + // Read in capacity, and any hidden stuff + s.defaultReadObject(); + + count.set(0); + last = head = new Node(null); + + // Read in all elements and place in queue + for (;;) { + E item = (E)s.readObject(); + if (item == null) + break; + add(item); + } + } +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,455 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +import java.util.concurrent.locks.*; +import java.util.*; + +/** + * An unbounded {@linkplain BlockingQueue blocking queue} that uses + * the same ordering rules as class {@link PriorityQueue} and supplies + * blocking retrieval operations. While this queue is logically + * unbounded, attempted additions may fail due to resource exhaustion + * (causing OutOfMemoryError). This class does not permit + * null elements. A priority queue relying on natural + * ordering also does not permit insertion of non-comparable objects + * (doing so results in ClassCastException). + * + *

This class implements all of the optional methods + * of the {@link Collection} and {@link Iterator} interfaces. + *

The Iterator provided in method {@link #iterator()} is + * not guaranteed to traverse the elements of the + * PriorityBlockingQueue in any particular order. If you need ordered + * traversal, consider using Arrays.sort(pq.toArray()). + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.5 + * @author Doug Lea + * @param the type of elements held in this collection + */ +public class PriorityBlockingQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + private static final long serialVersionUID = 5595510919245408276L; + + private final PriorityQueue q; + private final ReentrantLock lock = new ReentrantLock(true); + private final Condition notEmpty = lock.newCondition(); + + /** + * Creates a PriorityBlockingQueue with the default initial + * capacity + * (11) that orders its elements according to their natural + * ordering (using Comparable). + */ + public PriorityBlockingQueue() { + q = new PriorityQueue(); + } + + /** + * Creates a PriorityBlockingQueue with the specified initial + * capacity + * that orders its elements according to their natural ordering + * (using Comparable). + * + * @param initialCapacity the initial capacity for this priority queue. + * @throws IllegalArgumentException if initialCapacity is less + * than 1 + */ + public PriorityBlockingQueue(int initialCapacity) { + q = new PriorityQueue(initialCapacity, null); + } + + /** + * Creates a PriorityBlockingQueue with the specified initial + * capacity + * that orders its elements according to the specified comparator. + * + * @param initialCapacity the initial capacity for this priority queue. + * @param comparator the comparator used to order this priority queue. + * If null then the order depends on the elements' natural + * ordering. + * @throws IllegalArgumentException if initialCapacity is less + * than 1 + */ + public PriorityBlockingQueue(int initialCapacity, + Comparator comparator) { + q = new PriorityQueue(initialCapacity, comparator); + } + + /** + * Creates a PriorityBlockingQueue containing the elements + * in the specified collection. The priority queue has an initial + * capacity of 110% of the size of the specified collection. If + * the specified collection is a {@link SortedSet} or a {@link + * PriorityQueue}, this priority queue will be sorted according to + * the same comparator, or according to its elements' natural + * order if the collection is sorted according to its elements' + * natural order. Otherwise, this priority queue is ordered + * according to its elements' natural order. + * + * @param c the collection whose elements are to be placed + * into this priority queue. + * @throws ClassCastException if elements of the specified collection + * cannot be compared to one another according to the priority + * queue's ordering. + * @throws NullPointerException if c or any element within it + * is null + */ + public PriorityBlockingQueue(Collection c) { + q = new PriorityQueue(c); + } + + + // these first few override just to update doc comments + + /** + * Adds the specified element to this queue. + * @param o the element to add + * @return true (as per the general contract of + * Collection.add). + * + * @throws NullPointerException if the specified element is null. + * @throws ClassCastException if the specified element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + */ + public boolean add(E o) { + return super.add(o); + } + + /** + * Returns the comparator used to order this collection, or null + * if this collection is sorted according to its elements natural ordering + * (using Comparable). + * + * @return the comparator used to order this collection, or null + * if this collection is sorted according to its elements natural ordering. + */ + public Comparator comparator() { + return q.comparator(); + } + + /** + * Inserts the specified element into this priority queue. + * + * @param o the element to add + * @return true + * @throws ClassCastException if the specified element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o) { + if (o == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + boolean ok = q.offer(o); + assert ok; + notEmpty.signal(); + return true; + } finally { + lock.unlock(); + } + } + + /** + * Adds the specified element to this priority queue. As the queue is + * unbounded this method will never block. + * @param o the element to add + * @throws ClassCastException if the element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + * @throws NullPointerException if the specified element is null. + */ + public void put(E o) { + offer(o); // never need to block + } + + /** + * Inserts the specified element into this priority queue. As the queue is + * unbounded this method will never block. + * @param o the element to add + * @param timeout This parameter is ignored as the method never blocks + * @param unit This parameter is ignored as the method never blocks + * @return true + * @throws ClassCastException if the element cannot be compared + * with elements currently in the priority queue according + * to the priority queue's ordering. + * @throws NullPointerException if the specified element is null. + */ + public boolean offer(E o, long timeout, TimeUnit unit) { + return offer(o); // never need to block + } + + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + try { + while (q.size() == 0) + notEmpty.await(); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + E x = q.poll(); + assert x != null; + return x; + } finally { + lock.unlock(); + } + } + + + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.poll(); + } finally { + lock.unlock(); + } + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E x = q.poll(); + if (x != null) + return x; + if (nanos <= 0) + return null; + try { + nanos = notEmpty.awaitNanos(nanos); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + } + } finally { + lock.unlock(); + } + } + + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.peek(); + } finally { + lock.unlock(); + } + } + + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.size(); + } finally { + lock.unlock(); + } + } + + /** + * Always returns Integer.MAX_VALUE because + * a PriorityBlockingQueue is not capacity constrained. + * @return Integer.MAX_VALUE + */ + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + public boolean remove(Object o) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.remove(o); + } finally { + lock.unlock(); + } + } + + public boolean contains(Object o) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.contains(o); + } finally { + lock.unlock(); + } + } + + public Object[] toArray() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toArray(); + } finally { + lock.unlock(); + } + } + + + public String toString() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toString(); + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int n = 0; + E e; + while ( (e = q.poll()) != null) { + c.add(e); + ++n; + } + return n; + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int n = 0; + E e; + while (n < maxElements && (e = q.poll()) != null) { + c.add(e); + ++n; + } + return n; + } finally { + lock.unlock(); + } + } + + /** + * Atomically removes all of the elements from this delay queue. + * The queue will be empty after this call returns. + */ + public void clear() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + q.clear(); + } finally { + lock.unlock(); + } + } + + public T[] toArray(T[] a) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.toArray(a); + } finally { + lock.unlock(); + } + } + + /** + * Returns an iterator over the elements in this queue. The + * iterator does not return the elements in any particular order. + * The returned iterator is a thread-safe "fast-fail" iterator + * that will throw {@link + * java.util.ConcurrentModificationException} upon detected + * interference. + * + * @return an iterator over the elements in this queue. + */ + public Iterator iterator() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return new Itr(q.iterator()); + } finally { + lock.unlock(); + } + } + + private class Itr implements Iterator { + private final Iterator iter; + Itr(Iterator i) { + iter = i; + } + + public boolean hasNext() { + /* + * No sync -- we rely on underlying hasNext to be + * stateless, in which case we can return true by mistake + * only when next() will subsequently throw + * ConcurrentModificationException. + */ + return iter.hasNext(); + } + + public E next() { + ReentrantLock lock = PriorityBlockingQueue.this.lock; + lock.lock(); + try { + return iter.next(); + } finally { + lock.unlock(); + } + } + + public void remove() { + ReentrantLock lock = PriorityBlockingQueue.this.lock; + lock.lock(); + try { + iter.remove(); + } finally { + lock.unlock(); + } + } + } + + /** + * Save the state to a stream (that is, serialize it). This + * merely wraps default serialization within lock. The + * serialization strategy for items is left to underlying + * Queue. Note that locking is not needed on deserialization, so + * readObject is not defined, just relying on default. + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + lock.lock(); + try { + s.defaultWriteObject(); + } finally { + lock.unlock(); + } + } + +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java URL: http://svn.apache.org/viewvc/incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java?view=auto&rev=442038 ============================================================================== --- incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java (added) +++ incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java Sun Sep 10 16:59:30 2006 @@ -0,0 +1,62 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +/** + * Exception thrown by an {@link Executor} when a task cannot be + * accepted for execution. + * + * @since 1.5 + * @author Doug Lea + */ +public class RejectedExecutionException extends RuntimeException { + private static final long serialVersionUID = -375805702767069545L; + + /** + * Constructs a RejectedExecutionException with no detail message. + * The cause is not initialized, and may subsequently be + * initialized by a call to {@link #initCause(Throwable) initCause}. + */ + public RejectedExecutionException() { } + + /** + * Constructs a RejectedExecutionException with the + * specified detail message. The cause is not initialized, and may + * subsequently be initialized by a call to {@link + * #initCause(Throwable) initCause}. + * + * @param message the detail message + */ + public RejectedExecutionException(String message) { + super(message); + } + + /** + * Constructs a RejectedExecutionException with the + * specified detail message and cause. + * + * @param message the detail message + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public RejectedExecutionException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a RejectedExecutionException with the + * specified cause. The detail message is set to:

 (cause ==
+     * null ? null : cause.toString())
(which typically contains + * the class and detail message of cause). + * + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method) + */ + public RejectedExecutionException(Throwable cause) { + super(cause); + } +} Propchange: incubator/harmony/standard/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java ------------------------------------------------------------------------------ svn:eol-style = native