harmony-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndbe...@apache.org
Subject svn commit: r434296 [4/19] - in /incubator/harmony/enhanced/classlib/trunk: make/ modules/concurrent/ modules/concurrent/.settings/ modules/concurrent/META-INF/ modules/concurrent/make/ modules/concurrent/src/ modules/concurrent/src/main/ modules/concu...
Date Thu, 24 Aug 2006 03:42:33 GMT
Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,659 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.security.AccessControlException;
+
+/**
+ * Factory and utility methods for {@link Executor}, {@link
+ * ExecutorService}, {@link ScheduledExecutorService}, {@link
+ * ThreadFactory}, and {@link Callable} classes defined in this
+ * package. This class supports the following kinds of methods:
+ * 
+ * <ul>
+ *   <li> Methods that create and return an {@link ExecutorService} 
+ *        set up with commonly useful configuration settings. 
+ *   <li> Methods that create and return a {@link ScheduledExecutorService} 
+ *        set up with commonly useful configuration settings. 
+ *   <li> Methods that create and return a "wrapped" ExecutorService, that
+ *        disables reconfiguration by making implementation-specific methods
+ *        inaccessible.
+ *   <li> Methods that create and return a {@link ThreadFactory}
+ *        that sets newly created threads to a known state.
+ *   <li> Methods that create and return a {@link Callable} 
+ *        out of other closure-like forms, so they can be used
+ *        in execution methods requiring <tt>Callable</tt>.
+ * </ul>
+ *
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class Executors {
+
+    /**
+     * Creates a thread pool that reuses a fixed set of threads
+     * operating off a shared unbounded queue. If any thread
+     * terminates due to a failure during execution prior to shutdown,
+     * a new one will take its place if needed to execute subsequent
+     * tasks.
+     *
+     * @param nThreads the number of threads in the pool
+     * @return the newly created thread pool
+     */
+    public static ExecutorService newFixedThreadPool(int nThreads) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                                      0L, TimeUnit.MILLISECONDS,
+                                      new LinkedBlockingQueue<Runnable>());
+    }
+
+    /**
+     * Creates a thread pool that reuses a fixed set of threads
+     * operating off a shared unbounded queue, using the provided
+     * ThreadFactory to create new threads when needed.
+     *
+     * @param nThreads the number of threads in the pool
+     * @param threadFactory the factory to use when creating new threads
+     * @return the newly created thread pool
+     */
+    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                                      0L, TimeUnit.MILLISECONDS,
+                                      new LinkedBlockingQueue<Runnable>(),
+                                      threadFactory);
+    }
+
+    /**
+     * Creates an Executor that uses a single worker thread operating
+     * off an unbounded queue. (Note however that if this single
+     * thread terminates due to a failure during execution prior to
+     * shutdown, a new one will take its place if needed to execute
+     * subsequent tasks.)  Tasks are guaranteed to execute
+     * sequentially, and no more than one task will be active at any
+     * given time. Unlike the otherwise equivalent
+     * <tt>newFixedThreadPool(1)</tt> the returned executor is
+     * guaranteed not to be reconfigurable to use additional threads.
+     *
+     * @return the newly created single-threaded Executor
+     */
+    public static ExecutorService newSingleThreadExecutor() {
+        return new DelegatedExecutorService
+            (new ThreadPoolExecutor(1, 1,
+                                    0L, TimeUnit.MILLISECONDS,
+                                    new LinkedBlockingQueue<Runnable>()));
+    }
+
+    /**
+     * Creates an Executor that uses a single worker thread operating
+     * off an unbounded queue, and uses the provided ThreadFactory to
+     * create a new thread when needed. Unlike the otherwise
+     * equivalent <tt>newFixedThreadPool(1, threadFactory)</tt> the returned executor
+     * is guaranteed not to be reconfigurable to use additional
+     * threads.
+     * 
+     * @param threadFactory the factory to use when creating new
+     * threads
+     *
+     * @return the newly created single-threaded Executor
+     */
+    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
+        return new DelegatedExecutorService
+            (new ThreadPoolExecutor(1, 1,
+                                    0L, TimeUnit.MILLISECONDS,
+                                    new LinkedBlockingQueue<Runnable>(),
+                                    threadFactory));
+    }
+
+    /**
+     * Creates a thread pool that creates new threads as needed, but
+     * will reuse previously constructed threads when they are
+     * available.  These pools will typically improve the performance
+     * of programs that execute many short-lived asynchronous tasks.
+     * Calls to <tt>execute</tt> will reuse previously constructed
+     * threads if available. If no existing thread is available, a new
+     * thread will be created and added to the pool. Threads that have
+     * not been used for sixty seconds are terminated and removed from
+     * the cache. Thus, a pool that remains idle for long enough will
+     * not consume any resources. Note that pools with similar
+     * properties but different details (for example, timeout parameters)
+     * may be created using {@link ThreadPoolExecutor} constructors.
+     *
+     * @return the newly created thread pool
+     */
+    public static ExecutorService newCachedThreadPool() {
+        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+                                      60L, TimeUnit.SECONDS,
+                                      new SynchronousQueue<Runnable>());
+    }
+
+    /**
+     * Creates a thread pool that creates new threads as needed, but
+     * will reuse previously constructed threads when they are
+     * available, and uses the provided
+     * ThreadFactory to create new threads when needed.
+     * @param threadFactory the factory to use when creating new threads
+     * @return the newly created thread pool
+     */
+    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+                                      60L, TimeUnit.SECONDS,
+                                      new SynchronousQueue<Runnable>(),
+                                      threadFactory);
+    }
+   
+    /**
+     * Creates a single-threaded executor that can schedule commands
+     * to run after a given delay, or to execute periodically.
+     * (Note however that if this single
+     * thread terminates due to a failure during execution prior to
+     * shutdown, a new one will take its place if needed to execute
+     * subsequent tasks.)  Tasks are guaranteed to execute
+     * sequentially, and no more than one task will be active at any
+     * given time. Unlike the otherwise equivalent
+     * <tt>newScheduledThreadPool(1)</tt> the returned executor is
+     * guaranteed not to be reconfigurable to use additional threads.
+     * @return the newly created scheduled executor
+     */
+    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
+        return new DelegatedScheduledExecutorService
+            (new ScheduledThreadPoolExecutor(1));
+    }
+
+    /**
+     * Creates a single-threaded executor that can schedule commands
+     * to run after a given delay, or to execute periodically.  (Note
+     * however that if this single thread terminates due to a failure
+     * during execution prior to shutdown, a new one will take its
+     * place if needed to execute subsequent tasks.)  Tasks are
+     * guaranteed to execute sequentially, and no more than one task
+     * will be active at any given time. Unlike the otherwise
+     * equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
+     * the returned executor is guaranteed not to be reconfigurable to
+     * use additional threads.
+     * @param threadFactory the factory to use when creating new
+     * threads
+     * @return a newly created scheduled executor
+     */
+    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
+        return new DelegatedScheduledExecutorService
+            (new ScheduledThreadPoolExecutor(1, threadFactory));
+    }
+    
+    /**
+     * Creates a thread pool that can schedule commands to run after a 
+     * given delay, or to execute periodically.
+     * @param corePoolSize the number of threads to keep in the pool,
+     * even if they are idle.
+     * @return a newly created scheduled thread pool
+     */
+    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
+        return new ScheduledThreadPoolExecutor(corePoolSize);
+    }
+
+    /**
+     * Creates a thread pool that can schedule commands to run after a 
+     * given delay, or to execute periodically.
+     * @param corePoolSize the number of threads to keep in the pool,
+     * even if they are idle.
+     * @param threadFactory the factory to use when the executor
+     * creates a new thread. 
+     * @return a newly created scheduled thread pool
+     */
+    public static ScheduledExecutorService newScheduledThreadPool(
+            int corePoolSize, ThreadFactory threadFactory) {
+        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+    }
+
+
+    /**
+     * Returns an object that delegates all defined {@link
+     * ExecutorService} methods to the given executor, but not any
+     * other methods that might otherwise be accessible using
+     * casts. This provides a way to safely "freeze" configuration and
+     * disallow tuning of a given concrete implementation.
+     * @param executor the underlying implementation
+     * @return an <tt>ExecutorService</tt> instance
+     * @throws NullPointerException if executor null
+     */
+    public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
+        if (executor == null)
+            throw new NullPointerException();
+        return new DelegatedExecutorService(executor);
+    }
+
+    /**
+     * Returns an object that delegates all defined {@link
+     * ScheduledExecutorService} methods to the given executor, but
+     * not any other methods that might otherwise be accessible using
+     * casts. This provides a way to safely "freeze" configuration and
+     * disallow tuning of a given concrete implementation.
+     * @param executor the underlying implementation
+     * @return a <tt>ScheduledExecutorService</tt> instance
+     * @throws NullPointerException if executor null
+     */
+    public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
+        if (executor == null)
+            throw new NullPointerException();
+        return new DelegatedScheduledExecutorService(executor);
+    }
+        
+    /**
+     * Returns a default thread factory used to create new threads.
+     * This factory creates all new threads used by an Executor in the
+     * same {@link ThreadGroup}. If there is a {@link
+     * java.lang.SecurityManager}, it uses the group of {@link
+     * System#getSecurityManager}, else the group of the thread
+     * invoking this <tt>defaultThreadFactory</tt> method. Each new
+     * thread is created as a non-daemon thread with priority
+     * <tt>Thread.NORM_PRIORITY</tt>. New threads have names
+     * accessible via {@link Thread#getName} of
+     * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
+     * number of this factory, and <em>M</em> is the sequence number
+     * of the thread created by this factory.
+     * @return a thread factory
+     */
+    public static ThreadFactory defaultThreadFactory() {
+        return new DefaultThreadFactory();
+    }
+
+    /**
+     * Returns a thread factory used to create new threads that
+     * have the same permissions as the current thread.
+     * This factory creates threads with the same settings as {@link
+     * Executors#defaultThreadFactory}, additionally setting the
+     * AccessControlContext and contextClassLoader of new threads to
+     * be the same as the thread invoking this
+     * <tt>privilegedThreadFactory</tt> method.  A new
+     * <tt>privilegedThreadFactory</tt> can be created within an
+     * {@link AccessController#doPrivileged} action setting the
+     * current thread's access control context to create threads with
+     * the selected permission settings holding within that action.
+     *
+     * <p> Note that while tasks running within such threads will have
+     * the same access control and class loader settings as the
+     * current thread, they need not have the same {@link
+     * java.lang.ThreadLocal} or {@link
+     * java.lang.InheritableThreadLocal} values. If necessary,
+     * particular values of thread locals can be set or reset before
+     * any task runs in {@link ThreadPoolExecutor} subclasses using
+     * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
+     * necessary to initialize worker threads to have the same
+     * InheritableThreadLocal settings as some other designated
+     * thread, you can create a custom ThreadFactory in which that
+     * thread waits for and services requests to create others that
+     * will inherit its values.
+     *
+     * @return a thread factory
+     * @throws AccessControlException if the current access control
+     * context does not have permission to both get and set context
+     * class loader.
+     */
+    public static ThreadFactory privilegedThreadFactory() {
+        return new PrivilegedThreadFactory();
+    }
+
+    /**
+     * Returns a {@link Callable} object that, when
+     * called, runs the given task and returns the given result.  This
+     * can be useful when applying methods requiring a
+     * <tt>Callable</tt> to an otherwise resultless action.
+     * @param task the task to run
+     * @param result the result to return
+     * @throws NullPointerException if task null
+     * @return a callable object
+     */
+    public static <T> Callable<T> callable(Runnable task, T result) {
+        if (task == null)
+            throw new NullPointerException();
+        return new RunnableAdapter<T>(task, result);
+    }
+
+    /**
+     * Returns a {@link Callable} object that, when
+     * called, runs the given task and returns <tt>null</tt>.
+     * @param task the task to run
+     * @return a callable object
+     * @throws NullPointerException if task null
+     */
+    public static Callable<Object> callable(Runnable task) {
+        if (task == null)
+            throw new NullPointerException();
+        return new RunnableAdapter<Object>(task, null);
+    }
+
+    /**
+     * Returns a {@link Callable} object that, when
+     * called, runs the given privileged action and returns its result.
+     * @param action the privileged action to run
+     * @return a callable object
+     * @throws NullPointerException if action null
+     */
+    public static Callable<Object> callable(PrivilegedAction action) {
+        if (action == null)
+            throw new NullPointerException();
+        return new PrivilegedActionAdapter(action);
+    }
+
+    /**
+     * Returns a {@link Callable} object that, when
+     * called, runs the given privileged exception action and returns
+     * its result.
+     * @param action the privileged exception action to run
+     * @return a callable object
+     * @throws NullPointerException if action null
+     */
+    public static Callable<Object> callable(PrivilegedExceptionAction action) {
+        if (action == null)
+            throw new NullPointerException();
+        return new PrivilegedExceptionActionAdapter(action);
+    }
+
+    /**
+     * Returns a {@link Callable} object that will, when
+     * called, execute the given <tt>callable</tt> under the current
+     * access control context. This method should normally be
+     * invoked within an {@link AccessController#doPrivileged} action
+     * to create callables that will, if possible, execute under the
+     * selected permission settings holding within that action; or if
+     * not possible, throw an associated {@link
+     * AccessControlException}.
+     * @param callable the underlying task
+     * @return a callable object
+     * @throws NullPointerException if callable null
+     *
+     */
+    public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
+        if (callable == null)
+            throw new NullPointerException();
+        return new PrivilegedCallable(callable);
+    }
+    
+    /**
+     * Returns a {@link Callable} object that will, when
+     * called, execute the given <tt>callable</tt> under the current
+     * access control context, with the current context class loader
+     * as the context class loader. This method should normally be
+     * invoked within an {@link AccessController#doPrivileged} action
+     * to create callables that will, if possible, execute under the
+     * selected permission settings holding within that action; or if
+     * not possible, throw an associated {@link
+     * AccessControlException}.
+     * @param callable the underlying task
+     *
+     * @return a callable object
+     * @throws NullPointerException if callable null
+     * @throws AccessControlException if the current access control
+     * context does not have permission to both set and get context
+     * class loader.
+     */
+    public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
+        if (callable == null)
+            throw new NullPointerException();
+        return new PrivilegedCallableUsingCurrentClassLoader(callable);
+    }
+
+    // Non-public classes supporting the public methods
+
+    /**
+     * A callable that runs given task and returns given result
+     */
+    static final class RunnableAdapter<T> implements Callable<T> {
+        final Runnable task;
+        final T result;
+        RunnableAdapter(Runnable  task, T result) {
+            this.task = task; 
+            this.result = result;
+        }
+        public T call() { 
+            task.run(); 
+            return result; 
+        }
+    }
+
+    /**
+     * A callable that runs given privileged action and returns its result
+     */
+    static final class PrivilegedActionAdapter implements Callable<Object> {
+        PrivilegedActionAdapter(PrivilegedAction action) {
+            this.action = action;
+        }
+        public Object call () {
+            return action.run();
+        }
+        private final PrivilegedAction action;
+    }
+
+    /**
+     * A callable that runs given privileged exception action and returns its result
+     */
+    static final class PrivilegedExceptionActionAdapter implements Callable<Object> {
+        PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
+            this.action = action;
+        }
+        public Object call () throws Exception {
+            return action.run();
+        }
+        private final PrivilegedExceptionAction action;
+    }
+
+
+    /**
+     * A callable that runs under established access control settings
+     */
+    static final class PrivilegedCallable<T> implements Callable<T> {
+        private final AccessControlContext acc;
+        private final Callable<T> task;
+        private T result;
+        private Exception exception;
+        PrivilegedCallable(Callable<T> task) {
+            this.task = task;
+            this.acc = AccessController.getContext();
+        }
+
+        public T call() throws Exception {
+            AccessController.doPrivileged(new PrivilegedAction() {
+                    public Object run() {
+                        try {
+                            result = task.call();
+                        } catch(Exception ex) {
+                            exception = ex;
+                        }
+                        return null;
+                    }
+                }, acc);
+            if (exception != null)
+                throw exception;
+            else 
+                return result;
+        }
+    }
+
+    /**
+     * A callable that runs under established access control settings and
+     * current ClassLoader
+     */
+    static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
+        private final ClassLoader ccl;
+        private final AccessControlContext acc;
+        private final Callable<T> task;
+        private T result;
+        private Exception exception;
+        PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
+            this.task = task;
+            this.ccl = Thread.currentThread().getContextClassLoader();
+            this.acc = AccessController.getContext();
+            acc.checkPermission(new RuntimePermission("getContextClassLoader"));
+            acc.checkPermission(new RuntimePermission("setContextClassLoader"));
+        }
+
+        public T call() throws Exception {
+            AccessController.doPrivileged(new PrivilegedAction() {
+                    public Object run() {
+                        ClassLoader savedcl = null;
+                        Thread t = Thread.currentThread();
+                        try {
+                            ClassLoader cl = t.getContextClassLoader();
+                            if (ccl != cl) {
+                                t.setContextClassLoader(ccl);
+                                savedcl = cl;
+                            }
+                            result = task.call();
+                        } catch(Exception ex) {
+                            exception = ex;
+                        } finally {
+                            if (savedcl != null)
+                                t.setContextClassLoader(savedcl);
+                        }
+                        return null;
+                    }
+                }, acc);
+            if (exception != null)
+                throw exception;
+            else 
+                return result;
+        }
+    }
+
+    /**
+     * The default thread factory
+     */
+    static class DefaultThreadFactory implements ThreadFactory {
+        static final AtomicInteger poolNumber = new AtomicInteger(1);
+        final ThreadGroup group;
+        final AtomicInteger threadNumber = new AtomicInteger(1);
+        final String namePrefix;
+
+        DefaultThreadFactory() {
+            SecurityManager s = System.getSecurityManager();
+            group = (s != null)? s.getThreadGroup() :
+                                 Thread.currentThread().getThreadGroup();
+            namePrefix = "pool-" + 
+                          poolNumber.getAndIncrement() + 
+                         "-thread-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, r, 
+                                  namePrefix + threadNumber.getAndIncrement(),
+                                  0);
+            if (t.isDaemon())
+                t.setDaemon(false);
+            if (t.getPriority() != Thread.NORM_PRIORITY)
+                t.setPriority(Thread.NORM_PRIORITY);
+            return t;
+        }
+    }
+
+    /**
+     *  Thread factory capturing access control and class loader
+     */
+    static class PrivilegedThreadFactory extends DefaultThreadFactory {
+        private final ClassLoader ccl;
+        private final AccessControlContext acc;
+
+        PrivilegedThreadFactory() {
+            super();
+            this.ccl = Thread.currentThread().getContextClassLoader();
+            this.acc = AccessController.getContext();
+            acc.checkPermission(new RuntimePermission("setContextClassLoader"));
+        }
+        
+        public Thread newThread(final Runnable r) {
+            return super.newThread(new Runnable() {
+                public void run() {
+                    AccessController.doPrivileged(new PrivilegedAction() {
+                        public Object run() { 
+                            Thread.currentThread().setContextClassLoader(ccl);
+                            r.run();
+                            return null; 
+                        }
+                    }, acc);
+                }
+            });
+        }
+        
+    }
+
+   /**
+     * A wrapper class that exposes only the ExecutorService methods
+     * of an implementation.
+     */
+    static class DelegatedExecutorService extends AbstractExecutorService {
+        private final ExecutorService e;
+        DelegatedExecutorService(ExecutorService executor) { e = executor; }
+        public void execute(Runnable command) { e.execute(command); }
+        public void shutdown() { e.shutdown(); }
+        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
+        public boolean isShutdown() { return e.isShutdown(); }
+        public boolean isTerminated() { return e.isTerminated(); }
+        public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException {
+            return e.awaitTermination(timeout, unit);
+        }
+        public Future<?> submit(Runnable task) {
+            return e.submit(task);
+        }
+        public <T> Future<T> submit(Callable<T> task) {
+            return e.submit(task);
+        }
+        public <T> Future<T> submit(Runnable task, T result) {
+            return e.submit(task, result);
+        }
+        public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
+            throws InterruptedException {
+            return e.invokeAll(tasks);
+        }
+        public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, 
+                                             long timeout, TimeUnit unit) 
+            throws InterruptedException {
+            return e.invokeAll(tasks, timeout, unit);
+        }
+        public <T> T invokeAny(Collection<Callable<T>> tasks)
+            throws InterruptedException, ExecutionException {
+            return e.invokeAny(tasks);
+        }
+        public <T> T invokeAny(Collection<Callable<T>> tasks, 
+                               long timeout, TimeUnit unit) 
+            throws InterruptedException, ExecutionException, TimeoutException {
+            return e.invokeAny(tasks, timeout, unit);
+        }
+    }
+    
+    /**
+     * A wrapper class that exposes only the ExecutorService and 
+     * ScheduleExecutor methods of a ScheduledExecutorService implementation.
+     */
+    static class DelegatedScheduledExecutorService
+            extends DelegatedExecutorService 
+            implements ScheduledExecutorService {
+        private final ScheduledExecutorService e;
+        DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
+            super(executor);
+            e = executor;
+        }
+        public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
+            return e.schedule(command, delay, unit);
+        }
+        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+            return e.schedule(callable, delay, unit);
+        }
+        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
+            return e.scheduleAtFixedRate(command, initialDelay, period, unit);
+        }
+        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
+            return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+        }
+    }
+
+        
+    /** Cannot instantiate. */
+    private Executors() {}
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Executors.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,133 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * A <tt>Future</tt> represents the result of an asynchronous
+ * computation.  Methods are provided to check if the computation is
+ * complete, to wait for its completion, and to retrieve the result of
+ * the computation.  The result can only be retrieved using method
+ * <tt>get</tt> when the computation has completed, blocking if
+ * necessary until it is ready.  Cancellation is performed by the
+ * <tt>cancel</tt> method.  Additional methods are provided to
+ * determine if the task completed normally or was cancelled. Once a
+ * computation has completed, the computation cannot be cancelled.
+ * If you would like to use a <tt>Future</tt> for the sake
+ * of cancellability but not provide a usable result, you can
+ * declare types of the form <tt>Future&lt;?&gt;</tt> and
+ * return <tt>null</tt> as a result of the underlying task.
+ *
+ * <p>
+ * <b>Sample Usage</b> (Note that the following classes are all
+ * made-up.) <p>
+ * <pre>
+ * interface ArchiveSearcher { String search(String target); }
+ * class App {
+ *   ExecutorService executor = ...
+ *   ArchiveSearcher searcher = ...
+ *   void showSearch(final String target) throws InterruptedException {
+ *     Future&lt;String&gt; future = executor.submit(new Callable&lt;String&gt;() {
+ *         public String call() { return searcher.search(target); }
+ *     });
+ *     displayOtherThings(); // do other things while searching
+ *     try {
+ *       displayText(future.get()); // use future
+ *     } catch (ExecutionException ex) { cleanup(); return; }
+ *   }
+ * }
+ * </pre>
+ *
+ * The {@link FutureTask} class is an implementation of <tt>Future</tt> that 
+ * implements <tt>Runnable</tt>, and so may be executed by an <tt>Executor</tt>. 
+ * For example, the above construction with <tt>submit</tt> could be replaced by:
+ * <pre>
+ *     FutureTask&lt;String&gt; future =
+ *       new FutureTask&lt;String&gt;(new Callable&lt;String&gt;() {
+ *         public String call() {
+ *           return searcher.search(target);
+ *       }});
+ *     executor.execute(future);
+ * </pre>
+ * @see FutureTask
+ * @see Executor
+ * @since 1.5
+ * @author Doug Lea
+ * @param <V> The result type returned by this Future's <tt>get</tt> method
+ */
+public interface Future<V> {
+
+    /**
+     * Attempts to cancel execution of this task.  This attempt will
+     * fail if the task has already completed, already been cancelled,
+     * or could not be cancelled for some other reason. If successful,
+     * and this task has not started when <tt>cancel</tt> is called,
+     * this task should never run.  If the task has already started,
+     * then the <tt>mayInterruptIfRunning</tt> parameter determines
+     * whether the thread executing this task should be interrupted in
+     * an attempt to stop the task.
+     *
+     * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
+     * task should be interrupted; otherwise, in-progress tasks are allowed
+     * to complete
+     * @return <tt>false</tt> if the task could not be cancelled,
+     * typically because it has already completed normally;
+     * <tt>true</tt> otherwise
+     */
+    boolean cancel(boolean mayInterruptIfRunning);
+
+    /**
+     * Returns <tt>true</tt> if this task was cancelled before it completed
+     * normally.
+     *
+     * @return <tt>true</tt> if task was cancelled before it completed
+     */
+    boolean isCancelled();
+
+    /**
+     * Returns <tt>true</tt> if this task completed.  
+     *
+     * Completion may be due to normal termination, an exception, or
+     * cancellation -- in all of these cases, this method will return
+     * <tt>true</tt>.
+     * 
+     * @return <tt>true</tt> if this task completed.
+     */
+    boolean isDone();
+
+    /**
+     * Waits if necessary for the computation to complete, and then
+     * retrieves its result.
+     *
+     * @return the computed result
+     * @throws CancellationException if the computation was cancelled
+     * @throws ExecutionException if the computation threw an
+     * exception
+     * @throws InterruptedException if the current thread was interrupted
+     * while waiting
+     */
+    V get() throws InterruptedException, ExecutionException;
+
+    /**
+     * Waits if necessary for at most the given time for the computation
+     * to complete, and then retrieves its result, if available.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return the computed result
+     * @throws CancellationException if the computation was cancelled
+     * @throws ExecutionException if the computation threw an
+     * exception
+     * @throws InterruptedException if the current thread was interrupted
+     * while waiting
+     * @throws TimeoutException if the wait timed out
+     */
+    V get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException;
+}
+
+
+

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/Future.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,276 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.locks.*;
+
+/**
+ * A cancellable asynchronous computation.  This class provides a base
+ * implementation of {@link Future}, with methods to start and cancel
+ * a computation, query to see if the computation is complete, and
+ * retrieve the result of the computation.  The result can only be
+ * retrieved when the computation has completed; the <tt>get</tt>
+ * method will block if the computation has not yet completed.  Once
+ * the computation has completed, the computation cannot be restarted
+ * or cancelled.
+ *
+ * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
+ * {@link java.lang.Runnable} object.  Because <tt>FutureTask</tt>
+ * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
+ * submitted to an {@link Executor} for execution.
+ *
+ * <p>In addition to serving as a standalone class, this class provides
+ * <tt>protected</tt> functionality that may be useful when creating
+ * customized task classes.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
+ */
+public class FutureTask<V> implements Future<V>, Runnable {
+    /** Synchronization control for FutureTask */
+    private final Sync sync;
+
+    /**
+     * Creates a <tt>FutureTask</tt> that will upon running, execute the
+     * given <tt>Callable</tt>.
+     *
+     * @param  callable the callable task
+     * @throws NullPointerException if callable is null
+     */
+    public FutureTask(Callable<V> callable) {
+        if (callable == null)
+            throw new NullPointerException();
+        sync = new Sync(callable);
+    }
+
+    /**
+     * Creates a <tt>FutureTask</tt> that will upon running, execute the
+     * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
+     * given result on successful completion.
+     *
+     * @param  runnable the runnable task
+     * @param result the result to return on successful completion. If
+     * you don't need a particular result, consider using
+     * constructions of the form:
+     * <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
+     * @throws NullPointerException if runnable is null
+     */
+    public FutureTask(Runnable runnable, V result) {
+        sync = new Sync(Executors.callable(runnable, result));
+    }
+
+    public boolean isCancelled() {
+        return sync.innerIsCancelled();
+    }
+    
+    public boolean isDone() {
+        return sync.innerIsDone();
+    }
+
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return sync.innerCancel(mayInterruptIfRunning);
+    }
+    
+    public V get() throws InterruptedException, ExecutionException {
+        return sync.innerGet();
+    }
+
+    public V get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return sync.innerGet(unit.toNanos(timeout));
+    }
+
+    /**
+     * Protected method invoked when this task transitions to state
+     * <tt>isDone</tt> (whether normally or via cancellation). The
+     * default implementation does nothing.  Subclasses may override
+     * this method to invoke completion callbacks or perform
+     * bookkeeping. Note that you can query status inside the
+     * implementation of this method to determine whether this task
+     * has been cancelled.
+     */
+    protected void done() { }
+
+    /**
+     * Sets the result of this Future to the given value unless
+     * this future has already been set or has been cancelled.
+     * @param v the value
+     */ 
+    protected void set(V v) {
+        sync.innerSet(v);
+    }
+
+    /**
+     * Causes this future to report an <tt>ExecutionException</tt>
+     * with the given throwable as its cause, unless this Future has
+     * already been set or has been cancelled.
+     * @param t the cause of failure.
+     */ 
+    protected void setException(Throwable t) {
+        sync.innerSetException(t);
+    }
+    
+    /**
+     * Sets this Future to the result of computation unless
+     * it has been cancelled.
+     */
+    public void run() {
+        sync.innerRun();
+    }
+
+    /**
+     * Executes the computation without setting its result, and then
+     * resets this Future to initial state, failing to do so if the
+     * computation encounters an exception or is cancelled.  This is
+     * designed for use with tasks that intrinsically execute more
+     * than once.
+     * @return true if successfully run and reset
+     */
+    protected boolean runAndReset() {
+        return sync.innerRunAndReset();
+    }
+
+    /**
+     * Synchronization control for FutureTask. Note that this must be
+     * a non-static inner class in order to invoke the protected
+     * <tt>done</tt> method. For clarity, all inner class support
+     * methods are same as outer, prefixed with "inner".
+     *
+     * Uses AQS sync state to represent run status
+     */
+    private final class Sync extends AbstractQueuedSynchronizer {
+        /** State value representing that task is running */
+        private static final int RUNNING   = 1;
+        /** State value representing that task ran */
+        private static final int RAN       = 2;
+        /** State value representing that task was cancelled */
+        private static final int CANCELLED = 4;
+
+        /** The underlying callable */
+        private final Callable<V> callable;
+        /** The result to return from get() */
+        private V result;
+        /** The exception to throw from get() */
+        private Throwable exception;
+
+        /** 
+         * The thread running task. When nulled after set/cancel, this
+         * indicates that the results are accessible.  Must be
+         * volatile, to serve as write barrier on completion.
+         */
+        private volatile Thread runner;
+
+        Sync(Callable<V> callable) {
+            this.callable = callable;
+        }
+
+        private boolean ranOrCancelled(int state) {
+            return (state & (RAN | CANCELLED)) != 0;
+        }
+
+        /**
+         * Implements AQS base acquire to succeed if ran or cancelled
+         */
+        protected int tryAcquireShared(int ignore) {
+            return innerIsDone()? 1 : -1;
+        }
+
+        /**
+         * Implements AQS base release to always signal after setting
+         * final done status by nulling runner thread.
+         */
+        protected boolean tryReleaseShared(int ignore) {
+            runner = null;
+            return true; 
+        }
+
+        boolean innerIsCancelled() {
+            return getState() == CANCELLED;
+        }
+        
+        boolean innerIsDone() {
+            return ranOrCancelled(getState()) && runner == null;
+        }
+
+        V innerGet() throws InterruptedException, ExecutionException {
+            acquireSharedInterruptibly(0);
+            if (getState() == CANCELLED)
+                throw new CancellationException();
+            if (exception != null)
+                throw new ExecutionException(exception);
+            return result;
+        }
+
+        V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
+            if (!tryAcquireSharedNanos(0, nanosTimeout))
+                throw new TimeoutException();                
+            if (getState() == CANCELLED)
+                throw new CancellationException();
+            if (exception != null)
+                throw new ExecutionException(exception);
+            return result;
+        }
+
+        void innerSet(V v) {
+            int s = getState();
+            if (ranOrCancelled(s) || !compareAndSetState(s, RAN))
+                return;
+            result = v;
+            releaseShared(0);
+            done();
+        }
+
+        void innerSetException(Throwable t) {
+            int s = getState();
+            if (ranOrCancelled(s) || !compareAndSetState(s, RAN)) 
+                return;
+            exception = t;
+            result = null;
+            releaseShared(0);
+            done();
+        }
+
+        boolean innerCancel(boolean mayInterruptIfRunning) {
+            int s = getState();
+            if (ranOrCancelled(s) || !compareAndSetState(s, CANCELLED)) 
+                return false;
+            if (mayInterruptIfRunning) {
+                Thread r = runner;
+                if (r != null)
+                    r.interrupt();
+            }
+            releaseShared(0);
+            done();
+            return true;
+        }
+
+        void innerRun() {
+            if (!compareAndSetState(0, RUNNING)) 
+                return;
+            try {
+                runner = Thread.currentThread();
+                innerSet(callable.call());
+            } catch(Throwable ex) {
+                innerSetException(ex);
+            } 
+        }
+
+        boolean innerRunAndReset() {
+            if (!compareAndSetState(0, RUNNING)) 
+                return false;
+            try {
+                runner = Thread.currentThread();
+                callable.call(); // don't set result
+                runner = null;
+                return compareAndSetState(RUNNING, 0);
+            } catch(Throwable ex) {
+                innerSetException(ex);
+                return false;
+            } 
+        }
+    }
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/FutureTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,720 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+import java.util.*;
+
+/**
+ * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
+ * linked nodes.
+ * This queue orders elements FIFO (first-in-first-out).
+ * The <em>head</em> of the queue is that element that has been on the
+ * queue the longest time.
+ * The <em>tail</em> of the queue is that element that has been on the
+ * queue the shortest time. New elements
+ * are inserted at the tail of the queue, and the queue retrieval
+ * operations obtain elements at the head of the queue.
+ * Linked queues typically have higher throughput than array-based queues but
+ * less predictable performance in most concurrent applications.
+ *
+ * <p> The optional capacity bound constructor argument serves as a
+ * way to prevent excessive queue expansion. The capacity, if unspecified,
+ * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
+ * dynamically created upon each insertion unless this would bring the
+ * queue above capacity.
+ *
+ * <p>This class implements all of the <em>optional</em> methods
+ * of the {@link Collection} and {@link Iterator} interfaces.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ *
+ **/
+public class LinkedBlockingQueue<E> extends AbstractQueue<E>
+        implements BlockingQueue<E>, java.io.Serializable {
+    private static final long serialVersionUID = -6903933977591709194L;
+
+    /*
+     * A variant of the "two lock queue" algorithm.  The putLock gates
+     * entry to put (and offer), and has an associated condition for
+     * waiting puts.  Similarly for the takeLock.  The "count" field
+     * that they both rely on is maintained as an atomic to avoid
+     * needing to get both locks in most cases. Also, to minimize need
+     * for puts to get takeLock and vice-versa, cascading notifies are
+     * used. When a put notices that it has enabled at least one take,
+     * it signals taker. That taker in turn signals others if more
+     * items have been entered since the signal. And symmetrically for
+     * takes signalling puts. Operations such as remove(Object) and
+     * iterators acquire both locks.
+    */
+
+    /**
+     * Linked list node class
+     */
+    static class Node<E> {
+        /** The item, volatile to ensure barrier separating write and read */
+        volatile E item;
+        Node<E> next;
+        Node(E x) { item = x; }
+    }
+
+    /** The capacity bound, or Integer.MAX_VALUE if none */
+    private final int capacity;
+
+    /** Current number of elements */
+    private final AtomicInteger count = new AtomicInteger(0);
+
+    /** Head of linked list */
+    private transient Node<E> head;
+
+    /** Tail of linked list */
+    private transient Node<E> last;
+
+    /** Lock held by take, poll, etc */
+    private final ReentrantLock takeLock = new ReentrantLock();
+
+    /** Wait queue for waiting takes */
+    private final Condition notEmpty = takeLock.newCondition();
+
+    /** Lock held by put, offer, etc */
+    private final ReentrantLock putLock = new ReentrantLock();
+
+    /** Wait queue for waiting puts */
+    private final Condition notFull = putLock.newCondition();
+
+    /**
+     * Signal a waiting take. Called only from put/offer (which do not
+     * otherwise ordinarily lock takeLock.)
+     */
+    private void signalNotEmpty() {
+        final ReentrantLock takeLock = this.takeLock;
+        takeLock.lock();
+        try {
+            notEmpty.signal();
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+    /**
+     * Signal a waiting put. Called only from take/poll.
+     */
+    private void signalNotFull() {
+        final ReentrantLock putLock = this.putLock;
+        putLock.lock();
+        try {
+            notFull.signal();
+        } finally {
+            putLock.unlock();
+        }
+    }
+
+    /**
+     * Create a node and link it at end of queue
+     * @param x the item
+     */
+    private void insert(E x) {
+        last = last.next = new Node<E>(x);
+    }
+
+    /**
+     * Remove a node from head of queue,
+     * @return the node
+     */
+    private E extract() {
+        Node<E> first = head.next;
+        head = first;
+        E x = first.item;
+        first.item = null;
+        return x;
+    }
+
+    /**
+     * Lock to prevent both puts and takes.
+     */
+    private void fullyLock() {
+        putLock.lock();
+        takeLock.lock();
+    }
+
+    /**
+     * Unlock to allow both puts and takes.
+     */
+    private void fullyUnlock() {
+        takeLock.unlock();
+        putLock.unlock();
+    }
+
+
+    /**
+     * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+     * {@link Integer#MAX_VALUE}.
+     */
+    public LinkedBlockingQueue() {
+        this(Integer.MAX_VALUE);
+    }
+
+    /**
+     * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
+     *
+     * @param capacity the capacity of this queue.
+     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
+     *         than zero.
+     */
+    public LinkedBlockingQueue(int capacity) {
+        if (capacity <= 0) throw new IllegalArgumentException();
+        this.capacity = capacity;
+        last = head = new Node<E>(null);
+    }
+
+    /**
+     * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
+     * {@link Integer#MAX_VALUE}, initially containing the elements of the
+     * given collection,
+     * added in traversal order of the collection's iterator.
+     * @param c the collection of elements to initially contain
+     * @throws NullPointerException if <tt>c</tt> or any element within it
+     * is <tt>null</tt>
+     */
+    public LinkedBlockingQueue(Collection<? extends E> c) {
+        this(Integer.MAX_VALUE);
+        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
+            add(it.next());
+    }
+
+
+    // this doc comment is overridden to remove the reference to collections
+    // greater in size than Integer.MAX_VALUE
+    /**
+     * Returns the number of elements in this queue.
+     *
+     * @return  the number of elements in this queue.
+     */
+    public int size() {
+        return count.get();
+    }
+
+    // this doc comment is a modified copy of the inherited doc comment,
+    // without the reference to unlimited queues.
+    /**
+     * Returns the number of elements that this queue can ideally (in
+     * the absence of memory or resource constraints) accept without
+     * blocking. This is always equal to the initial capacity of this queue
+     * less the current <tt>size</tt> of this queue.
+     * <p>Note that you <em>cannot</em> always tell if
+     * an attempt to <tt>add</tt> an element will succeed by
+     * inspecting <tt>remainingCapacity</tt> because it may be the
+     * case that a waiting consumer is ready to <tt>take</tt> an
+     * element out of an otherwise full queue.
+     */
+    public int remainingCapacity() {
+        return capacity - count.get();
+    }
+
+    /**
+     * Adds the specified element to the tail of this queue, waiting if
+     * necessary for space to become available.
+     * @param o the element to add
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public void put(E o) throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        // Note: convention in all put/take/etc is to preset
+        // local var holding count  negative to indicate failure unless set.
+        int c = -1;
+        final ReentrantLock putLock = this.putLock;
+        final AtomicInteger count = this.count;
+        putLock.lockInterruptibly();
+        try {
+            /*
+             * Note that count is used in wait guard even though it is
+             * not protected by lock. This works because count can
+             * only decrease at this point (all other puts are shut
+             * out by lock), and we (or some other waiting put) are
+             * signalled if it ever changes from
+             * capacity. Similarly for all other uses of count in
+             * other wait guards.
+             */
+            try {
+                while (count.get() == capacity)
+                    notFull.await();
+            } catch (InterruptedException ie) {
+                notFull.signal(); // propagate to a non-interrupted thread
+                throw ie;
+            }
+            insert(o);
+            c = count.getAndIncrement();
+            if (c + 1 < capacity)
+                notFull.signal();
+        } finally {
+            putLock.unlock();
+        }
+        if (c == 0)
+            signalNotEmpty();
+    }
+
+    /**
+     * Inserts the specified element at the tail of this queue, waiting if
+     * necessary up to the specified wait time for space to become available.
+     * @param o the element to add
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return <tt>true</tt> if successful, or <tt>false</tt> if
+     * the specified waiting time elapses before space is available.
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public boolean offer(E o, long timeout, TimeUnit unit)
+        throws InterruptedException {
+
+        if (o == null) throw new NullPointerException();
+        long nanos = unit.toNanos(timeout);
+        int c = -1;
+        final ReentrantLock putLock = this.putLock;
+        final AtomicInteger count = this.count;
+        putLock.lockInterruptibly();
+        try {
+            for (;;) {
+                if (count.get() < capacity) {
+                    insert(o);
+                    c = count.getAndIncrement();
+                    if (c + 1 < capacity)
+                        notFull.signal();
+                    break;
+                }
+                if (nanos <= 0)
+                    return false;
+                try {
+                    nanos = notFull.awaitNanos(nanos);
+                } catch (InterruptedException ie) {
+                    notFull.signal(); // propagate to a non-interrupted thread
+                    throw ie;
+                }
+            }
+        } finally {
+            putLock.unlock();
+        }
+        if (c == 0)
+            signalNotEmpty();
+        return true;
+    }
+
+    /**
+     * Inserts the specified element at the tail of this queue if possible,
+     * returning immediately if this queue is full.
+     *
+     * @param o the element to add.
+     * @return <tt>true</tt> if it was possible to add the element to
+     *         this queue, else <tt>false</tt>
+     * @throws NullPointerException if the specified element is <tt>null</tt>
+     */
+    public boolean offer(E o) {
+        if (o == null) throw new NullPointerException();
+        final AtomicInteger count = this.count;
+        if (count.get() == capacity)
+            return false;
+        int c = -1;
+        final ReentrantLock putLock = this.putLock;
+        putLock.lock();
+        try {
+            if (count.get() < capacity) {
+                insert(o);
+                c = count.getAndIncrement();
+                if (c + 1 < capacity)
+                    notFull.signal();
+            }
+        } finally {
+            putLock.unlock();
+        }
+        if (c == 0)
+            signalNotEmpty();
+        return c >= 0;
+    }
+
+
+    public E take() throws InterruptedException {
+        E x;
+        int c = -1;
+        final AtomicInteger count = this.count;
+        final ReentrantLock takeLock = this.takeLock;
+        takeLock.lockInterruptibly();
+        try {
+            try {
+                while (count.get() == 0)
+                    notEmpty.await();
+            } catch (InterruptedException ie) {
+                notEmpty.signal(); // propagate to a non-interrupted thread
+                throw ie;
+            }
+
+            x = extract();
+            c = count.getAndDecrement();
+            if (c > 1)
+                notEmpty.signal();
+        } finally {
+            takeLock.unlock();
+        }
+        if (c == capacity)
+            signalNotFull();
+        return x;
+    }
+
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        E x = null;
+        int c = -1;
+        long nanos = unit.toNanos(timeout);
+        final AtomicInteger count = this.count;
+        final ReentrantLock takeLock = this.takeLock;
+        takeLock.lockInterruptibly();
+        try {
+            for (;;) {
+                if (count.get() > 0) {
+                    x = extract();
+                    c = count.getAndDecrement();
+                    if (c > 1)
+                        notEmpty.signal();
+                    break;
+                }
+                if (nanos <= 0)
+                    return null;
+                try {
+                    nanos = notEmpty.awaitNanos(nanos);
+                } catch (InterruptedException ie) {
+                    notEmpty.signal(); // propagate to a non-interrupted thread
+                    throw ie;
+                }
+            }
+        } finally {
+            takeLock.unlock();
+        }
+        if (c == capacity)
+            signalNotFull();
+        return x;
+    }
+
+    public E poll() {
+        final AtomicInteger count = this.count;
+        if (count.get() == 0)
+            return null;
+        E x = null;
+        int c = -1;
+        final ReentrantLock takeLock = this.takeLock;
+        takeLock.lock();
+        try {
+            if (count.get() > 0) {
+                x = extract();
+                c = count.getAndDecrement();
+                if (c > 1)
+                    notEmpty.signal();
+            }
+        } finally {
+            takeLock.unlock();
+        }
+        if (c == capacity)
+            signalNotFull();
+        return x;
+    }
+
+
+    public E peek() {
+        if (count.get() == 0)
+            return null;
+        final ReentrantLock takeLock = this.takeLock;
+        takeLock.lock();
+        try {
+            Node<E> first = head.next;
+            if (first == null)
+                return null;
+            else
+                return first.item;
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+    public boolean remove(Object o) {
+        if (o == null) return false;
+        boolean removed = false;
+        fullyLock();
+        try {
+            Node<E> trail = head;
+            Node<E> p = head.next;
+            while (p != null) {
+                if (o.equals(p.item)) {
+                    removed = true;
+                    break;
+                }
+                trail = p;
+                p = p.next;
+            }
+            if (removed) {
+                p.item = null;
+                trail.next = p.next;
+                if (count.getAndDecrement() == capacity)
+                    notFull.signalAll();
+            }
+        } finally {
+            fullyUnlock();
+        }
+        return removed;
+    }
+
+    public Object[] toArray() {
+        fullyLock();
+        try {
+            int size = count.get();
+            Object[] a = new Object[size];
+            int k = 0;
+            for (Node<E> p = head.next; p != null; p = p.next)
+                a[k++] = p.item;
+            return a;
+        } finally {
+            fullyUnlock();
+        }
+    }
+
+    public <T> T[] toArray(T[] a) {
+        fullyLock();
+        try {
+            int size = count.get();
+            if (a.length < size)
+                a = (T[])java.lang.reflect.Array.newInstance
+                    (a.getClass().getComponentType(), size);
+
+            int k = 0;
+            for (Node p = head.next; p != null; p = p.next)
+                a[k++] = (T)p.item;
+            return a;
+        } finally {
+            fullyUnlock();
+        }
+    }
+
+    public String toString() {
+        fullyLock();
+        try {
+            return super.toString();
+        } finally {
+            fullyUnlock();
+        }
+    }
+
+    public void clear() {
+        fullyLock();
+        try {
+            head.next = null;
+            if (count.getAndSet(0) == capacity)
+                notFull.signalAll();
+        } finally {
+            fullyUnlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        Node first;
+        fullyLock();
+        try {
+            first = head.next;
+            head.next = null;
+            if (count.getAndSet(0) == capacity)
+                notFull.signalAll();
+        } finally {
+            fullyUnlock();
+        }
+        // Transfer the elements outside of locks
+        int n = 0;
+        for (Node<E> p = first; p != null; p = p.next) {
+            c.add(p.item);
+            p.item = null;
+            ++n;
+        }
+        return n;
+    }
+        
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        if (maxElements <= 0)
+            return 0;
+        fullyLock();
+        try {
+            int n = 0;
+            Node<E> p = head.next;
+            while (p != null && n < maxElements) {
+                c.add(p.item);
+                p.item = null;
+                p = p.next;
+                ++n;
+            }
+            if (n != 0) {
+                head.next = p;
+                if (count.getAndAdd(-n) == capacity)
+                    notFull.signalAll();
+            }
+            return n;
+        } finally {
+            fullyUnlock();
+        }
+    }
+
+    /**
+     * Returns an iterator over the elements in this queue in proper sequence.
+     * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
+     * will never throw {@link java.util.ConcurrentModificationException},
+     * and guarantees to traverse elements as they existed upon
+     * construction of the iterator, and may (but is not guaranteed to)
+     * reflect any modifications subsequent to construction.
+     *
+     * @return an iterator over the elements in this queue in proper sequence.
+     */
+    public Iterator<E> iterator() {
+      return new Itr();
+    }
+
+    private class Itr implements Iterator<E> {
+        /*
+         * Basic weak-consistent iterator.  At all times hold the next
+         * item to hand out so that if hasNext() reports true, we will
+         * still have it to return even if lost race with a take etc.
+         */
+        private Node<E> current;
+        private Node<E> lastRet;
+        private E currentElement;
+
+        Itr() {
+            final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
+            final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
+            putLock.lock();
+            takeLock.lock();
+            try {
+                current = head.next;
+                if (current != null)
+                    currentElement = current.item;
+            } finally {
+                takeLock.unlock();
+                putLock.unlock();
+            }
+        }
+
+        public boolean hasNext() {
+            return current != null;
+        }
+
+        public E next() {
+            final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
+            final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
+            putLock.lock();
+            takeLock.lock();
+            try {
+                if (current == null)
+                    throw new NoSuchElementException();
+                E x = currentElement;
+                lastRet = current;
+                current = current.next;
+                if (current != null)
+                    currentElement = current.item;
+                return x;
+            } finally {
+                takeLock.unlock();
+                putLock.unlock();
+            }
+        }
+
+        public void remove() {
+            if (lastRet == null)
+                throw new IllegalStateException();
+            final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
+            final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
+            putLock.lock();
+            takeLock.lock();
+            try {
+                Node<E> node = lastRet;
+                lastRet = null;
+                Node<E> trail = head;
+                Node<E> p = head.next;
+                while (p != null && p != node) {
+                    trail = p;
+                    p = p.next;
+                }
+                if (p == node) {
+                    p.item = null;
+                    trail.next = p.next;
+                    int c = count.getAndDecrement();
+                    if (c == capacity)
+                        notFull.signalAll();
+                }
+            } finally {
+                takeLock.unlock();
+                putLock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Save the state to a stream (that is, serialize it).
+     *
+     * @serialData The capacity is emitted (int), followed by all of
+     * its elements (each an <tt>Object</tt>) in the proper order,
+     * followed by a null
+     * @param s the stream
+     */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+
+        fullyLock();
+        try {
+            // Write out any hidden stuff, plus capacity
+            s.defaultWriteObject();
+
+            // Write out all elements in the proper order.
+            for (Node<E> p = head.next; p != null; p = p.next)
+                s.writeObject(p.item);
+
+            // Use trailing null as sentinel
+            s.writeObject(null);
+        } finally {
+            fullyUnlock();
+        }
+    }
+
+    /**
+     * Reconstitute this queue instance from a stream (that is,
+     * deserialize it).
+     * @param s the stream
+     */
+    private void readObject(java.io.ObjectInputStream s)
+        throws java.io.IOException, ClassNotFoundException {
+        // Read in capacity, and any hidden stuff
+        s.defaultReadObject();
+
+        count.set(0);
+        last = head = new Node<E>(null);
+
+        // Read in all elements and place in queue
+        for (;;) {
+            E item = (E)s.readObject();
+            if (item == null)
+                break;
+            add(item);
+        }
+    }
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/LinkedBlockingQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,455 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+import java.util.concurrent.locks.*;
+import java.util.*;
+
+/**
+ * An unbounded {@linkplain BlockingQueue blocking queue} that uses
+ * the same ordering rules as class {@link PriorityQueue} and supplies
+ * blocking retrieval operations.  While this queue is logically
+ * unbounded, attempted additions may fail due to resource exhaustion
+ * (causing <tt>OutOfMemoryError</tt>). This class does not permit
+ * <tt>null</tt> elements.  A priority queue relying on natural
+ * ordering also does not permit insertion of non-comparable objects
+ * (doing so results in <tt>ClassCastException</tt>).
+ *
+ * <p>This class implements all of the <em>optional</em> methods
+ * of the {@link Collection} and {@link Iterator} interfaces.
+ * <p>The Iterator provided in method {@link #iterator()} is
+ * <em>not</em> guaranteed to traverse the elements of the
+ * PriorityBlockingQueue in any particular order. If you need ordered
+ * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class PriorityBlockingQueue<E> extends AbstractQueue<E>
+    implements BlockingQueue<E>, java.io.Serializable {
+    private static final long serialVersionUID = 5595510919245408276L;
+
+    private final PriorityQueue<E> q;
+    private final ReentrantLock lock = new ReentrantLock(true);
+    private final Condition notEmpty = lock.newCondition();
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> with the default initial 
+     * capacity
+     * (11) that orders its elements according to their natural
+     * ordering (using <tt>Comparable</tt>).
+     */
+    public PriorityBlockingQueue() {
+        q = new PriorityQueue<E>();
+    }
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
+     * capacity
+     * that orders its elements according to their natural ordering
+     * (using <tt>Comparable</tt>).
+     *
+     * @param initialCapacity the initial capacity for this priority queue.
+     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
+     * than 1
+     */
+    public PriorityBlockingQueue(int initialCapacity) {
+        q = new PriorityQueue<E>(initialCapacity, null);
+    }
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
+     * capacity
+     * that orders its elements according to the specified comparator.
+     *
+     * @param initialCapacity the initial capacity for this priority queue.
+     * @param comparator the comparator used to order this priority queue.
+     * If <tt>null</tt> then the order depends on the elements' natural
+     * ordering.
+     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
+     * than 1
+     */
+    public PriorityBlockingQueue(int initialCapacity,
+                                 Comparator<? super E> comparator) {
+        q = new PriorityQueue<E>(initialCapacity, comparator);
+    }
+
+    /**
+     * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
+     * in the specified collection.  The priority queue has an initial
+     * capacity of 110% of the size of the specified collection. If
+     * the specified collection is a {@link SortedSet} or a {@link
+     * PriorityQueue}, this priority queue will be sorted according to
+     * the same comparator, or according to its elements' natural
+     * order if the collection is sorted according to its elements'
+     * natural order.  Otherwise, this priority queue is ordered
+     * according to its elements' natural order.
+     *
+     * @param c the collection whose elements are to be placed
+     *        into this priority queue.
+     * @throws ClassCastException if elements of the specified collection
+     *         cannot be compared to one another according to the priority
+     *         queue's ordering.
+     * @throws NullPointerException if <tt>c</tt> or any element within it
+     * is <tt>null</tt>
+     */
+    public PriorityBlockingQueue(Collection<? extends E> c) {
+        q = new PriorityQueue<E>(c);
+    }
+
+
+    // these first few override just to update doc comments
+
+    /**
+     * Adds the specified element to this queue.
+     * @param o the element to add
+     * @return <tt>true</tt> (as per the general contract of
+     * <tt>Collection.add</tt>).
+     *
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     * @throws ClassCastException if the specified element cannot be compared
+     * with elements currently in the priority queue according
+     * to the priority queue's ordering.
+     */
+    public boolean add(E o) {
+        return super.add(o);
+    }
+
+    /**
+     * Returns the comparator used to order this collection, or <tt>null</tt>
+     * if this collection is sorted according to its elements natural ordering
+     * (using <tt>Comparable</tt>).
+     *
+     * @return the comparator used to order this collection, or <tt>null</tt>
+     * if this collection is sorted according to its elements natural ordering.
+     */
+    public Comparator comparator() {
+        return q.comparator();
+    }
+
+    /**
+     * Inserts the specified element into this priority queue.
+     *
+     * @param o the element to add
+     * @return <tt>true</tt>
+     * @throws ClassCastException if the specified element cannot be compared
+     * with elements currently in the priority queue according
+     * to the priority queue's ordering.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public boolean offer(E o) {
+        if (o == null) throw new NullPointerException();
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            boolean ok = q.offer(o);
+            assert ok;
+            notEmpty.signal();
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Adds the specified element to this priority queue. As the queue is
+     * unbounded this method will never block.
+     * @param o the element to add
+     * @throws ClassCastException if the element cannot be compared
+     * with elements currently in the priority queue according
+     * to the priority queue's ordering.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public void put(E o) {
+        offer(o); // never need to block
+    }
+
+    /**
+     * Inserts the specified element into this priority queue. As the queue is
+     * unbounded this method will never block.
+     * @param o the element to add
+     * @param timeout This parameter is ignored as the method never blocks
+     * @param unit This parameter is ignored as the method never blocks
+     * @return <tt>true</tt>
+     * @throws ClassCastException if the element cannot be compared
+     * with elements currently in the priority queue according
+     * to the priority queue's ordering.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    public boolean offer(E o, long timeout, TimeUnit unit) {
+        return offer(o); // never need to block
+    }
+
+    public E take() throws InterruptedException {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            try {
+                while (q.size() == 0)
+                    notEmpty.await();
+            } catch (InterruptedException ie) {
+                notEmpty.signal(); // propagate to non-interrupted thread
+                throw ie;
+            }
+            E x = q.poll();
+            assert x != null;
+            return x;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public E poll() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.poll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        long nanos = unit.toNanos(timeout);
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            for (;;) {
+                E x = q.poll();
+                if (x != null)
+                    return x;
+                if (nanos <= 0)
+                    return null;
+                try {
+                    nanos = notEmpty.awaitNanos(nanos);
+                } catch (InterruptedException ie) {
+                    notEmpty.signal(); // propagate to non-interrupted thread
+                    throw ie;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E peek() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.peek();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int size() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Always returns <tt>Integer.MAX_VALUE</tt> because
+     * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
+     * @return <tt>Integer.MAX_VALUE</tt>
+     */
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    public boolean remove(Object o) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.remove(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean contains(Object o) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.contains(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public Object[] toArray() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public String toString() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toString();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            E e;
+            while ( (e = q.poll()) != null) {
+                c.add(e);
+                ++n;
+            }
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        if (maxElements <= 0)
+            return 0;
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            int n = 0;
+            E e;
+            while (n < maxElements && (e = q.poll()) != null) {
+                c.add(e);
+                ++n;
+            }
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Atomically removes all of the elements from this delay queue.
+     * The queue will be empty after this call returns.
+     */
+    public void clear() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            q.clear();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public <T> T[] toArray(T[] a) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return q.toArray(a);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an iterator over the elements in this queue. The
+     * iterator does not return the elements in any particular order.
+     * The returned iterator is a thread-safe "fast-fail" iterator
+     * that will throw {@link
+     * java.util.ConcurrentModificationException} upon detected
+     * interference.
+     *
+     * @return an iterator over the elements in this queue.
+     */
+    public Iterator<E> iterator() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            return new Itr(q.iterator());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private class Itr<E> implements Iterator<E> {
+        private final Iterator<E> iter;
+        Itr(Iterator<E> i) {
+            iter = i;
+        }
+
+        public boolean hasNext() {
+            /*
+             * No sync -- we rely on underlying hasNext to be
+             * stateless, in which case we can return true by mistake
+             * only when next() will subsequently throw
+             * ConcurrentModificationException.
+             */
+            return iter.hasNext();
+        }
+
+        public E next() {
+            ReentrantLock lock = PriorityBlockingQueue.this.lock;
+            lock.lock();
+            try {
+                return iter.next();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public void remove() {
+            ReentrantLock lock = PriorityBlockingQueue.this.lock;
+            lock.lock();
+            try {
+                iter.remove();
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Save the state to a stream (that is, serialize it).  This
+     * merely wraps default serialization within lock.  The
+     * serialization strategy for items is left to underlying
+     * Queue. Note that locking is not needed on deserialization, so
+     * readObject is not defined, just relying on default.
+     */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        lock.lock();
+        try {
+            s.defaultWriteObject();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/PriorityBlockingQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java
URL: http://svn.apache.org/viewvc/incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java?rev=434296&view=auto
==============================================================================
--- incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java (added)
+++ incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java Wed Aug 23 20:42:25 2006
@@ -0,0 +1,62 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package java.util.concurrent;
+
+/**
+ * Exception thrown by an {@link Executor} when a task cannot be
+ * accepted for execution.
+ * 
+ * @since 1.5
+ * @author Doug Lea
+ */
+public class RejectedExecutionException extends RuntimeException {
+    private static final long serialVersionUID = -375805702767069545L;
+
+    /**
+     * Constructs a <tt>RejectedExecutionException</tt> with no detail message.
+     * The cause is not initialized, and may subsequently be
+     * initialized by a call to {@link #initCause(Throwable) initCause}.
+     */
+    public RejectedExecutionException() { }
+
+    /**
+     * Constructs a <tt>RejectedExecutionException</tt> with the
+     * specified detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link
+     * #initCause(Throwable) initCause}.
+     *
+     * @param message the detail message
+     */
+    public RejectedExecutionException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a <tt>RejectedExecutionException</tt> with the
+     * specified detail message and cause.
+     *
+     * @param  message the detail message
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method)
+     */
+    public RejectedExecutionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a <tt>RejectedExecutionException</tt> with the
+     * specified cause.  The detail message is set to: <pre> (cause ==
+     * null ? null : cause.toString())</pre> (which typically contains
+     * the class and detail message of <tt>cause</tt>).
+     *
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method)
+     */
+    public RejectedExecutionException(Throwable cause) {
+        super(cause);
+    }
+}

Propchange: incubator/harmony/enhanced/classlib/trunk/modules/concurrent/src/main/java/java/util/concurrent/RejectedExecutionException.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message