activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1091195 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/decaf/lang/ main/decaf/util/ main/decaf/util/concurrent/ main/decaf/util/concurrent/locks/ test/ test/decaf/util/concurrent/
Date Mon, 11 Apr 2011 20:22:40 GMT
Author: tabish
Date: Mon Apr 11 20:22:39 2011
New Revision: 1091195

URL: http://svn.apache.org/viewvc?rev=1091195&view=rev
Log:
Flush out some more of the Threading API and test what's working now.  Cleans up some other API code for the next release.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Apr 11 20:22:39 2011
@@ -432,6 +432,7 @@ cc_sources = \
     decaf/util/Timer.cpp \
     decaf/util/TimerTask.cpp \
     decaf/util/UUID.cpp \
+    decaf/util/concurrent/AbstractExecutorService.cpp \
     decaf/util/concurrent/BlockingQueue.cpp \
     decaf/util/concurrent/BrokenBarrierException.cpp \
     decaf/util/concurrent/Callable.cpp \
@@ -443,10 +444,12 @@ cc_sources = \
     decaf/util/concurrent/Delayed.cpp \
     decaf/util/concurrent/Executor.cpp \
     decaf/util/concurrent/ExecutorService.cpp \
+    decaf/util/concurrent/Executors.cpp \
     decaf/util/concurrent/Future.cpp \
     decaf/util/concurrent/LinkedBlockingQueue.cpp \
     decaf/util/concurrent/Lock.cpp \
     decaf/util/concurrent/Mutex.cpp \
+    decaf/util/concurrent/RejectedExecutionHandler.cpp \
     decaf/util/concurrent/Semaphore.cpp \
     decaf/util/concurrent/SynchronousQueue.cpp \
     decaf/util/concurrent/ThreadFactory.cpp \
@@ -456,6 +459,7 @@ cc_sources = \
     decaf/util/concurrent/atomic/AtomicInteger.cpp \
     decaf/util/concurrent/atomic/AtomicRefCounter.cpp \
     decaf/util/concurrent/atomic/AtomicReference.cpp \
+    decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp \
     decaf/util/concurrent/locks/LockSupport.cpp \
     decaf/util/concurrent/locks/ReentrantLock.cpp \
     decaf/util/logging/ConsoleHandler.cpp \
@@ -1019,6 +1023,7 @@ h_sources = \
     decaf/util/TimerTask.h \
     decaf/util/UUID.h \
     decaf/util/comparators/Less.h \
+    decaf/util/concurrent/AbstractExecutorService.h \
     decaf/util/concurrent/BlockingQueue.h \
     decaf/util/concurrent/BrokenBarrierException.h \
     decaf/util/concurrent/Callable.h \
@@ -1033,6 +1038,7 @@ h_sources = \
     decaf/util/concurrent/ExecutionException.h \
     decaf/util/concurrent/Executor.h \
     decaf/util/concurrent/ExecutorService.h \
+    decaf/util/concurrent/Executors.h \
     decaf/util/concurrent/Future.h \
     decaf/util/concurrent/LinkedBlockingQueue.h \
     decaf/util/concurrent/Lock.h \
@@ -1050,6 +1056,7 @@ h_sources = \
     decaf/util/concurrent/atomic/AtomicInteger.h \
     decaf/util/concurrent/atomic/AtomicRefCounter.h \
     decaf/util/concurrent/atomic/AtomicReference.h \
+    decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h \
     decaf/util/concurrent/locks/Condition.h \
     decaf/util/concurrent/locks/Lock.h \
     decaf/util/concurrent/locks/LockSupport.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp Mon Apr 11 20:22:39 2011
@@ -20,6 +20,12 @@
 #include <decaf/lang/ArrayPointer.h>
 #include <decaf/lang/System.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IndexOutOfBoundsException.h>
+#include <decaf/lang/Short.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
+#include <decaf/lang/Float.h>
+#include <decaf/lang/Double.h>
 
 using namespace std;
 using namespace decaf;
@@ -69,6 +75,60 @@ String::String( const std::string& sourc
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+String::String(const char* array, int size) : contents(new Contents) {
+
+    if( size < 0 ) {
+        throw IndexOutOfBoundsException(
+            __FILE__, __LINE__, "size parameter out of Bounds: %d.", size );
+    }
+
+    if( array == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Buffer pointer passed was NULL." );
+    }
+
+    if(size > 0) {
+
+        this->contents->value = ArrayPointer<unsigned char>(size);
+        this->contents->length = size;
+
+        System::arraycopy( (unsigned char*)array, 0, contents->value.get(), 0, size );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String::String(const char* array, int size, int offset, int length) : contents(new Contents) {
+
+    if( size < 0 ) {
+        throw IndexOutOfBoundsException(
+            __FILE__, __LINE__, "size parameter out of Bounds: %d.", size );
+    }
+
+    if( offset > size || offset < 0 ) {
+        throw IndexOutOfBoundsException(
+            __FILE__, __LINE__, "offset parameter out of Bounds: %d.", offset );
+    }
+
+    if( length < 0 || length > size - offset ) {
+        throw IndexOutOfBoundsException(
+            __FILE__, __LINE__, "length parameter out of Bounds: %d.", length );
+    }
+
+    if( array == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Buffer pointer passed was NULL." );
+    }
+
+    if(size > 0) {
+
+        this->contents->value = ArrayPointer<unsigned char>(length);
+        this->contents->length = length;
+
+        System::arraycopy( (unsigned char*)array, offset, contents->value.get(), 0, length );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 String::~String() {
     try{
         delete this->contents;
@@ -133,3 +193,43 @@ std::string String::toString() const {
 
     return std::string( (const char*)this->contents->value.get(), this->length() );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(bool value) {
+
+    if(value) {
+        return String("true");
+    }
+
+    return String("false");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(char value) {
+    return String( &value, 1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(float value) {
+    return String( Float::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(double value) {
+    return String( Double::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(short value) {
+    return String( Short::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(int value) {
+    return String( Integer::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(long long value) {
+    return String( Long::toString(value) );
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h Mon Apr 11 20:22:39 2011
@@ -63,6 +63,43 @@ namespace lang {
          */
         String( const std::string& source );
 
+        /**
+         * Create a new String object that represents the given array of characters.  The method
+         * takes the size of the array as a parameter to allow for strings that are not NULL
+         * terminated, the caller can pass strlen(array) in the case where the array is properly
+         * NULL terminated.
+         *
+         * @param array
+         *      The character buffer to copy into this new String object.
+         * @param size
+         *      The size of the string buffer given, in case the string is not NULL terminated.
+         *
+         * @throws NullPointerException if the character array parameter is NULL.
+         * @throws IndexOutOfBoundsException if the size parameter is negative.
+         */
+        String( const char* array, int size );
+
+        /**
+         * Create a new String object that represents the given array of characters.  The method
+         * takes the size of the array as a parameter to allow for strings that are not NULL
+         * terminated, the caller can pass strlen(array) in the case where the array is properly
+         * NULL terminated.
+         *
+         * @param array
+         *      The character buffer to copy into this new String object.
+         * @param size
+         *      The size of the string buffer given, in case the string is not NULL terminated.
+         * @param offset
+         *      The position to start copying from in the given buffer.
+         * @param length
+         *      The number of bytes to copy from the given buffer starting from the offset.
+         *
+         * @throws NullPointerException if the character array parameter is NULL.
+         * @throws IndexOutOfBoundsException if the size, offset or length parameter is negative
+         *         or if the length to copy is greater than the span of size - offset.
+         */
+        String( const char* array, int size, int offset, int length );
+
         virtual ~String();
 
     public:
@@ -100,6 +137,78 @@ namespace lang {
          */
         virtual std::string toString() const;
 
+    public:  // Static methods.
+
+        /**
+         * Returns a String that represents the value of the given boolean value.
+         *
+         * @param value
+         *      The value whose string representation is to be returned.
+         *
+         * @returns "true" if the boolean is true, "false" otherwise.
+         */
+        static String valueOf(bool value);
+
+        /**
+         * Returns a String that represents the value of the given char value.
+         *
+         * @param value
+         *      The value whose string representation is to be returned.
+         *
+         * @returns a String that contains the single character value given.
+         */
+        static String valueOf(char value);
+
+        /**
+         * Returns a String that represents the value of the given float value.
+         *
+         * @param value
+         *      The value whose string representation is to be returned.
+         *
+         * @returns a String that contains the string representation of the float value given.
+         */
+        static String valueOf(float value);
+
+        /**
+         * Returns a String that represents the value of the given double value.
+         *
+         * @param value
+         *      The value whose string representation is to be returned.
+         *
+         * @returns a String that contains the string representation of the double value given.
+         */
+        static String valueOf(double value);
+
+        /**
+         * Returns a String that represents the value of the given short value.
+         *
+         * @param value
+         *      The value whose string representation is to be returned.
+         *
+         * @returns a String that contains the string representation of the short value given.
+         */
+        static String valueOf(short value);
+
+        /**
+         * Returns a String that represents the value of the given integer value.
+         *
+         * @param value
+         *      The value whose string representation is to be returned.
+         *
+         * @returns a String that contains the string representation of the integer value given.
+         */
+        static String valueOf(int value);
+
+        /**
+         * Returns a String that represents the value of the given 64bit long value.
+         *
+         * @param value
+         *      The value whose string representation is to be returned.
+         *
+         * @returns a String that contains the string representation of the 64 bit long value given.
+         */
+        static String valueOf(long long value);
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp Mon Apr 11 20:22:39 2011
@@ -28,8 +28,10 @@
 #include <decaf/lang/Exception.h>
 #include <decaf/lang/exceptions/RuntimeException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IllegalThreadStateException.h>
 #include <decaf/util/concurrent/TimeUnit.h>
 #include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/Executors.h>
 
 #include <vector>
 
@@ -231,11 +233,17 @@ void Thread::initThreading() {
 
     // We mark the thread where Decaf's Init routine is called from as our Main Thread.
     mainThread = Thread::createForeignThreadInstance( "Main Thread" );
+
+    // Initialize the Executors static data for use in ExecutorService classes.
+    Executors::initialize();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Thread::shutdownThreading() {
 
+    // First shutdown the Executors static data to remove dependencies on Threading.
+    Executors::shutdown();
+
     // Clear the Main Thread instance pointer, this indicates we are Shutdown.
     mainThread = NULL;
 
@@ -579,6 +587,21 @@ int Thread::getPriority() const {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool Thread::isDaemon() const {
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::setDaemon(bool value DECAF_UNUSED) {
+
+    if(this->properties->state > Thread::NEW) {
+        throw IllegalThreadStateException(__FILE__, __LINE__, "Thread is already active.");
+    }
+
+    // TODO - Set thread to detached or joinable as indicated by the value arg.
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void Thread::setUncaughtExceptionHandler( UncaughtExceptionHandler* handler ) {
     this->properties->exHandler = handler;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h Mon Apr 11 20:22:39 2011
@@ -263,6 +263,25 @@ namespace lang{
         void setPriority( int value );
 
         /**
+         * Sets if the given Thread is a Daemon Thread or not.  Daemon threads cannot be
+         * joined and its resource are automatically reclaimed when it terminates.
+         *
+         * @param value
+         *      Boolean indicating if this thread should be a daemon thread or not.
+         *
+         * @throws IllegalThreadStateException if the thread is already active.
+         */
+        void setDaemon(bool value);
+
+        /**
+         * Returns whether this thread is a daemon thread or not, if true this thread cannot
+         * be joined.
+         *
+         * @return true if the thread is a daemon thread.
+         */
+        bool isDaemon() const;
+
+        /**
          * Set the handler invoked when this thread abruptly terminates due to an uncaught exception.
          *
          * @returns a pointer to the set UncaughtExceptionHandler.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h Mon Apr 11 20:22:39 2011
@@ -85,6 +85,8 @@ namespace util {
             return AbstractCollection<E>::addAll( collection );
         }
 
+        using AbstractCollection<E>::remove;
+
         /**
          * {@inheritDoc}
          *

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AbstractExecutorService.h"
+
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractExecutorService::AbstractExecutorService() : ExecutorService() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractExecutorService::~AbstractExecutorService() {
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h Mon Apr 11 20:22:39 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_
+#define _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/util/concurrent/Executor.h>
+#include <decaf/util/concurrent/ExecutorService.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+    /**
+     * Provides a default implementation for the methods of the ExecutorService
+     * interface.  Use this class as a starting point for implementations of custom
+     * executor service implementations.
+     *
+     * @since 1.0
+     */
+    class DECAF_API AbstractExecutorService : public ExecutorService {
+    public:
+
+        AbstractExecutorService();
+        virtual ~AbstractExecutorService();
+
+    };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h Mon Apr 11 20:22:39 2011
@@ -167,6 +167,9 @@ namespace concurrent {
         virtual ~BlockingQueue() {
         }
 
+        using Queue<E>::offer;
+        using Queue<E>::poll;
+
         /**
          * Inserts the specified element into this queue, waiting if necessary for space
          * to become available.
@@ -179,8 +182,6 @@ namespace concurrent {
          */
         virtual void put( const E& value ) = 0;
 
-        using Queue<E>::offer;
-
         /**
          * Inserts the specified element into this queue, waiting up to the specified wait
          * time if necessary for space to become available.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h Mon Apr 11 20:22:39 2011
@@ -20,6 +20,8 @@
 
 #include <decaf/util/Config.h>
 
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/ArrayList.h>
 #include <decaf/util/concurrent/Executor.h>
 #include <decaf/util/concurrent/TimeUnit.h>
 #include <decaf/lang/exceptions/InterruptedException.h>
@@ -55,20 +57,55 @@ namespace concurrent {
         virtual ~ExecutorService() {}
 
         /**
-         * Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs,
-         * or the current thread is interrupted, whichever happens first.
+         * The caller will block until the executor has completed termination meaning all tasks
+         * that where scheduled before shutdown have now completed and the executor is ready for
+         * deletion.  If the timeout period elapses before the executor reaches the terminated
+         * state then this method return false to indicate it has not terminated.
          *
          * @param timeout
-         *      The amount of time to wait before timing out the Wait operation.
+         *      The amount of time to wait before abandoning the wait for termination.
          * @param unit
-         *      The Units that comprise the timeout value.
+         *      The unit of time that the timeout value represents.
          *
-         * @returns true if the executer terminated before the given timeout value elapsed.
+         * @return true if the executor terminated or false if the timeout expired.
          *
-         * @throws InterruptedException - if interrupted while waiting.
+         * @throws InterruptedException if this call is interrupted while awaiting termination.
          */
         virtual bool awaitTermination( long long timeout, const TimeUnit& unit ) = 0;
 
+        /**
+         * Performs an orderly shutdown of this Executor.  Previously queued tasks are allowed
+         * to complete but no new tasks are accepted for execution.  Calling this method more
+         * than once has no affect on this executor.
+         */
+        virtual void shutdown() = 0;
+
+        /**
+         * Attempts to stop all currently executing tasks and returns an ArrayList containing the
+         * Runnables that did not get executed, these object become the property of the caller and
+         * are not deleted by this class, they are removed from the work queue and forgotten about.
+         *
+         * There is no guarantee that this method will halt execution of currently executing tasks.
+         *
+         * @return an ArrayList containing all Runnable instance that were still waiting to be
+         *         executed by this class, call now owns those pointers.
+         */
+        virtual ArrayList<decaf::lang::Runnable*> shutdownNow() = 0;
+
+        /**
+         * Returns whether this executor has been shutdown or not.
+         *
+         * @return true if this executor has been shutdown.
+         */
+        virtual bool isShutdown() const = 0;
+
+        /**
+         * Returns whether all tasks have completed after this executor was shut down.
+         *
+         * @return true if all tasks have completed after a request to shut down was made.
+         */
+        virtual bool isTerminated() const = 0;
+
     };
 
 }}}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Executors.h"
+
+#include <decaf/lang/Exception.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/ThreadFactory.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class DefaultThreadFactory : public ThreadFactory {
+    public:
+
+        static AtomicInteger* poolNumber;
+
+    private:
+
+        //ThreadGroup group;
+        AtomicInteger threadNumber;
+        std::string namePrefix;
+
+    public:
+
+        DefaultThreadFactory() : ThreadFactory(), threadNumber(1), namePrefix() {
+
+            if(DefaultThreadFactory::poolNumber == NULL) {
+                throw NullPointerException();
+            }
+
+            namePrefix = std::string("pool-") +
+                         Integer::toString(poolNumber->getAndIncrement()) +
+                         "-thread-";
+        }
+
+        Thread* newThread(Runnable* task) {
+            Thread* thread = new Thread(task, namePrefix + Integer::toString(threadNumber.getAndIncrement()));
+
+            if (thread->isDaemon()) {
+                thread->setDaemon(false);
+            }
+
+            if (thread->getPriority() != Thread::NORM_PRIORITY) {
+                thread->setPriority(Thread::NORM_PRIORITY);
+            }
+
+            return thread;
+        }
+    };
+
+    AtomicInteger* DefaultThreadFactory::poolNumber = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Executors::Executors() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Executors::~Executors() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Executors::initialize() {
+    DefaultThreadFactory::poolNumber = new AtomicInteger(1);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Executors::shutdown() {
+    delete DefaultThreadFactory::poolNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadFactory* Executors::getDefaultThreadFactory() {
+    return new DefaultThreadFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newFixedThreadPool(int nThreads) {
+
+    Pointer< BlockingQueue<Runnable*> > backingQ;
+
+    try{
+
+        backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+        ExecutorService* service = new ThreadPoolExecutor(
+            nThreads, nThreads, 0, TimeUnit::MILLISECONDS, backingQ.get());
+
+        backingQ.release();
+
+        return service;
+
+    } catch(NullPointerException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(IllegalArgumentException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(Exception& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(...) {
+        throw Exception();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newFixedThreadPool(int nThreads, ThreadFactory* threadFactory) {
+
+    Pointer< BlockingQueue<Runnable*> > backingQ;
+
+    try{
+
+        backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+        ExecutorService* service = new ThreadPoolExecutor(
+            nThreads, nThreads, 0, TimeUnit::MILLISECONDS, backingQ.get(), threadFactory);
+
+        backingQ.release();
+
+        return service;
+
+    } catch(NullPointerException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(IllegalArgumentException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(Exception& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(...) {
+        throw Exception();
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h Mon Apr 11 20:22:39 2011
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_EXECUTORS_H_
+#define _DECAF_UTIL_CONCURRENT_EXECUTORS_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/util/concurrent/ExecutorService.h>
+#include <decaf/util/concurrent/ThreadFactory.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+    /**
+     * Implements a set of utilities for use with Executors, ExecutorService, ThreadFactory,
+     * and Callable types, as well as providing factory methods for instance of these
+     * types configured for the most common use cases.
+     *
+     * @since 1.0
+     */
+    class DECAF_API Executors {
+    private:
+
+        Executors();
+        Executors(const Executors&);
+        Executors& operator= (const Executors&);
+
+    public:
+
+        virtual ~Executors();
+
+        /**
+         * Creates and returns a new ThreadFactory that expresses the default behavior for
+         * ThreadFactories used in Executor classes.  The default factory create a new
+         * non-daemon thread with normal priority and a name whose value is equal to
+         * pool-N-thread-M, where N is the sequence number of this factory, and M is the
+         * sequence number of the thread created by this factory.
+         *
+         * @returns a new instance of the default thread factory used in Executors, the
+         *          caller takes ownership of the returned pointer.
+         */
+        static ThreadFactory* getDefaultThreadFactory();
+
+        /**
+         * Creates a new ThreadPoolExecutor with a fixed number of threads to process incoming
+         * tasks.  The thread pool will use an unbounded queue to store pending tasks.  At any
+         * given time the maximum threads in the pool will be equal to the number given to this
+         * factory method.  If a thread in the pool dies a new one will be spawned to take its
+         * place in the pool.  Tasks that are submitted when all pooled threads are busy will
+         * be held until a thread is freed if the pool has allocated its assigned number of
+         * threads already.
+         *
+         * @param nThreads
+         *      The number of threads to assign as the max for the new ExecutorService.
+         *
+         * @returns pointer to a new ExecutorService that is owned by the caller.
+         *
+         * @throws IllegalArgumentException if nThreads is less than or equal to zero.
+         */
+        static ExecutorService* newFixedThreadPool(int nThreads);
+
+        /**
+         * Creates a new ThreadPoolExecutor with a fixed number of threads to process incoming
+         * tasks.  The thread pool will use an unbounded queue to store pending tasks.  At any
+         * given time the maximum threads in the pool will be equal to the number given to this
+         * factory method.  If a thread in the pool dies a new one will be spawned to take its
+         * place in the pool.  Tasks that are submitted when all pooled threads are busy will
+         * be held until a thread is freed if the pool has allocated its assigned number of
+         * threads already.
+         *
+         * @param nThreads
+         *      The number of threads to assign as the max for the new ExecutorService.
+         * @param threadFactory
+         *      Instance of a ThreadFactory that will be used by the Executor to spawn new
+         *      worker threads.  This parameter cannot be NULL.
+         *
+         * @returns pointer to a new ExecutorService that is owned by the caller.
+         *
+         * @throws NullPointerException if threadFactory is NULL.
+         * @throws IllegalArgumentException if nThreads is less than or equal to zero.
+         */
+        static ExecutorService* newFixedThreadPool(int nThreads, ThreadFactory* threadFactory);
+
+    private:
+
+        static void initialize();
+        static void shutdown();
+
+        friend class decaf::lang::Thread;
+
+    };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_EXECUTORS_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RejectedExecutionHandler.h"
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+RejectedExecutionHandler::RejectedExecutionHandler() {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RejectedExecutionHandler::~RejectedExecutionHandler() {
+
+}
+

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h Mon Apr 11 20:22:39 2011
@@ -36,7 +36,8 @@ namespace concurrent {
     class DECAF_API RejectedExecutionHandler {
     public:
 
-        virtual ~RejectedExecutionHandler() {}
+        RejectedExecutionHandler();
+        virtual ~RejectedExecutionHandler();
 
         /**
          * Method that may be invoked by a {@link ThreadPoolExecutor} when
@@ -56,8 +57,7 @@ namespace concurrent {
          *
          * @throws RejectedExecutionException if there is no remedy.
          */
-        virtual void rejectedExecution( Runnable* r, ThreadPoolExecutor* executer )
-            throw( RejectedExecutionException ) = 0;
+        virtual void rejectedExecution( decaf::lang::Runnable* r, ThreadPoolExecutor* executer ) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h Mon Apr 11 20:22:39 2011
@@ -20,11 +20,10 @@
 
 #include <decaf/util/Config.h>
 
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+
 namespace decaf {
-namespace lang {
-    class Thread;
-    class Runnable;
-}
 namespace util {
 namespace concurrent {
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Mon Apr 11 20:22:39 2011
@@ -14,19 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <decaf/util/concurrent/ThreadPoolExecutor.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/CountDownLatch.h>
+
+#include "ThreadPoolExecutor.h"
+
+#include <decaf/util/Config.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/util/Timer.h>
 #include <decaf/util/TimerTask.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
-#include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/util/concurrent/RejectedExecutionException.h>
-#include <decaf/util/Config.h>
-#include <decaf/util/LinkedList.h>
+#include <decaf/util/concurrent/RejectedExecutionHandler.h>
+#include <decaf/util/concurrent/Executors.h>
 #include <decaf/lang/Pointer.h>
+#include <decaf/lang/Math.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
 
 #include <algorithm>
 #include <iostream>
@@ -63,6 +68,7 @@ namespace concurrent{
         int maxPoolSize;
         int corePoolSize;
         long long keepAliveTime;
+        bool coreThreadsCanTimeout;
         Pointer< BlockingQueue<decaf::lang::Runnable*> > workQueue;
         Mutex mainLock;
         CountDownLatch termination;
@@ -70,11 +76,15 @@ namespace concurrent{
         long long completedTasks;
         int largestPoolSize;
 
+        Pointer<ThreadFactory> factory;
+        Pointer<RejectedExecutionHandler> rejectionHandler;
+
     public:
 
         ExecutorKernel(ThreadPoolExecutor* parent,
                        int corePoolSize, int maxPoolSize, long long keepAliveTime,
-                       BlockingQueue<decaf::lang::Runnable*>* workQueue);
+                       BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                       ThreadFactory* threadFactory, RejectedExecutionHandler* handler);
 
         ~ExecutorKernel();
 
@@ -88,18 +98,24 @@ namespace concurrent{
 
         Runnable* deQueueTask();
 
-        void AllocateThread();
+        bool addWorker();
+
+        int addAllWorkers();
 
         bool isStoppedOrStopping();
 
         void shutdown();
 
+        void shutdownNow(ArrayList<Runnable*>& unexecutedTasks);
+
         bool awaitTermination(long long timeout, const TimeUnit& unit);
 
         void handleWorkerExit(Worker* worker);
 
         void tryTerminate();
 
+        void drainQueue(ArrayList<Runnable*>& unexecutedTasks);
+
     };
 
     class Worker : public lang::Thread {
@@ -137,6 +153,11 @@ namespace concurrent{
                     Runnable* task = this->kernel->deQueueTask();
 
                     if(this->done) {
+
+                        if(task != NULL) {
+                            delete task;
+                        }
+
                         break;
                     }
 
@@ -216,7 +237,122 @@ namespace concurrent{
 ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
                                        long long keepAliveTime, const TimeUnit& unit,
                                        BlockingQueue<decaf::lang::Runnable*>* workQueue) :
-    kernel(new ExecutorKernel(this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) {
+    AbstractExecutorService(),
+    kernel(NULL) {
+
+    try{
+
+        if(workQueue == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+        }
+
+        Pointer<RejectedExecutionHandler> handler(new ThreadPoolExecutor::AbortPolicy());
+        Pointer<ThreadFactory> threadFactory(Executors::getDefaultThreadFactory());
+
+        this->kernel = new ExecutorKernel(
+            this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+            threadFactory.get(), handler.get());
+
+        handler.release();
+        threadFactory.release();
+    }
+    DECAF_CATCH_RETHROW(NullPointerException)
+    DECAF_CATCH_RETHROW(IllegalArgumentException)
+    DECAF_CATCH_RETHROW(Exception)
+    DECAF_CATCHALL_THROW(Exception)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                                       long long keepAliveTime, const TimeUnit& unit,
+                                       BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                                       RejectedExecutionHandler* handler) :
+    AbstractExecutorService(),
+    kernel(NULL) {
+
+    try{
+
+        if(workQueue == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+        }
+
+        if(handler == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The RejectedExecutionHandler pointer cannot be NULL.");
+        }
+
+        Pointer<ThreadFactory> threadFactory(Executors::getDefaultThreadFactory());
+
+        this->kernel = new ExecutorKernel(
+            this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+            threadFactory.get(), handler);
+
+        threadFactory.release();
+    }
+    DECAF_CATCH_RETHROW(NullPointerException)
+    DECAF_CATCH_RETHROW(Exception)
+    DECAF_CATCHALL_THROW(Exception)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                                       long long keepAliveTime, const TimeUnit& unit,
+                                       BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                                       ThreadFactory* threadFactory) :
+    AbstractExecutorService(),
+    kernel(NULL) {
+
+    try{
+
+        if(workQueue == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+        }
+
+        if(threadFactory == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The ThreadFactory pointer cannot be NULL.");
+        }
+
+        Pointer<RejectedExecutionHandler> handler(new ThreadPoolExecutor::AbortPolicy());
+
+        this->kernel = new ExecutorKernel(
+            this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+            threadFactory, handler.get());
+
+        handler.release();
+    }
+    DECAF_CATCH_RETHROW(NullPointerException)
+    DECAF_CATCH_RETHROW(Exception)
+    DECAF_CATCHALL_THROW(Exception)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                                       long long keepAliveTime, const TimeUnit& unit,
+                                       BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                                       ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
+    AbstractExecutorService(),
+    kernel(NULL) {
+
+    try{
+
+        if(workQueue == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+        }
+
+        if(handler == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The RejectedExecutionHandler pointer cannot be NULL.");
+        }
+
+        if(threadFactory == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The ThreadFactory pointer cannot be NULL.");
+        }
+
+        this->kernel = new ExecutorKernel(
+            this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+            threadFactory, handler);
+    }
+    DECAF_CATCH_RETHROW(NullPointerException)
+    DECAF_CATCH_RETHROW(Exception)
+    DECAF_CATCHALL_THROW(Exception)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -225,7 +361,7 @@ ThreadPoolExecutor::~ThreadPoolExecutor(
     try{
         delete kernel;
     }
-    DECAF_CATCH_NOTHROW( lang::Exception )
+    DECAF_CATCH_NOTHROW(Exception)
     DECAF_CATCHALL_NOTHROW()
 }
 
@@ -257,6 +393,19 @@ void ThreadPoolExecutor::shutdown() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+ArrayList<Runnable*> ThreadPoolExecutor::shutdownNow() {
+
+    ArrayList<Runnable*> result;
+
+    try{
+        this->kernel->shutdownNow(result);
+        return result;
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::awaitTermination(long long timeout, const TimeUnit& unit) {
 
     try{
@@ -282,11 +431,53 @@ int ThreadPoolExecutor::getCorePoolSize(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setCorePoolSize(int poolSize) {
+
+    if (poolSize < 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Pool size given was negative.");
+    }
+
+    synchronized(&this->kernel->mainLock) {
+
+        //int delta = poolSize - this->kernel->corePoolSize;
+        this->kernel->corePoolSize = poolSize;
+
+        if (this->kernel->workers.size() > poolSize) {
+            // TODO - Once Threads are interruptible wake them up so some can terminate.
+        } else {
+
+            // TODO - Create new threads up to the new pool size, unless we are out
+            //        of work or run out while creating.
+//            int target = Math::min(delta, this->kernel->workQueue->size());
+//            while (target-- > 0 && addWorker(NULL, true)) {
+//                if (this->kernel->workQueue->isEmpty()) {
+//                    break;
+//                }
+//            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 int ThreadPoolExecutor::getMaximumPoolSize() const {
     return this->kernel->maxPoolSize;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setMaximumPoolSize(int maxSize) {
+
+    if (maxSize < 0 || maxSize < this->kernel->corePoolSize) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Size given was invalid.");
+    }
+
+    this->kernel->maxPoolSize = maxSize;
+
+    if (this->kernel->workers.size() > maxSize) {
+        // TODO - Wake idle worker threads when able to.
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 long long ThreadPoolExecutor::getTaskCount() const {
     return this->kernel->workQueue->size();
 }
@@ -325,6 +516,47 @@ int ThreadPoolExecutor::getLargestPoolSi
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setThreadFactory(ThreadFactory* factory) {
+
+    if (factory == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Cannot assign a NULL ThreadFactory.");
+    }
+
+    if (factory != this->kernel->factory) {
+        Pointer<ThreadFactory> temp(factory);
+        this->kernel->factory.swap(temp);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadFactory* ThreadPoolExecutor::getThreadFactory() const {
+    return this->kernel->factory.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RejectedExecutionHandler* ThreadPoolExecutor::getRejectedExecutionHandler() const {
+    return this->kernel->rejectionHandler.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setRejectedExecutionHandler(RejectedExecutionHandler* handler) {
+
+    if (handler == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Cannot assign a NULL ThreadFactory.");
+    }
+
+    if (handler != this->kernel->rejectionHandler) {
+        Pointer<RejectedExecutionHandler> temp(handler);
+        this->kernel->rejectionHandler.swap(temp);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BlockingQueue<Runnable*>* ThreadPoolExecutor::getQueue() {
+    return this->kernel->workQueue.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::isShutdown() const {
     return this->kernel->stopped.get();
 }
@@ -335,13 +567,94 @@ bool ThreadPoolExecutor::isTerminated() 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::isTerminating() const {
+    return this->kernel->isStoppedOrStopping() && !this->kernel->terminated.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::allowCoreThreadTimeout(bool value) {
+
+    if (value == true && this->kernel->keepAliveTime == 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__,
+            "Keep Alive Time must be set to a non-zero value to enable this option.");
+    }
+
+    if (value != this->kernel->coreThreadsCanTimeout) {
+        this->kernel->coreThreadsCanTimeout = value;
+        if (value == true) {
+            // TODO - When Threads are interruptible wake works so they can check timeout.
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ThreadPoolExecutor::getKeepAliveTime(const TimeUnit& unit) const {
+    return unit.convert(this->kernel->keepAliveTime, TimeUnit::MILLISECONDS);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setKeepAliveTime(long long timeout, const TimeUnit& unit) {
+
+    if (timeout < 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Timeout value cannot be negative.");
+    }
+
+    if (this->kernel->coreThreadsCanTimeout == true && unit.toMillis(timeout) == 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__,
+            "Keep Alive Time must be set to a non-zero value when allowCoreThreadsTimeout is enabled.");
+    }
+
+    long keepAliveTime = unit.toMillis(timeout);
+    long delta = keepAliveTime - this->kernel->keepAliveTime;
+    this->kernel->keepAliveTime = keepAliveTime;
+    if (delta < 0) {
+        // TODO - When Threads are interruptible wake works so they can check timeout.
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::allowsCoreThreadTimeout() const {
+    return this->kernel->coreThreadsCanTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::prestartCoreThread() {
+    return this->kernel->addWorker();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::prestartAllCoreThreads() {
+    return this->kernel->addAllWorkers();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::remove(decaf::lang::Runnable* task) {
+    bool removed = this->kernel->workQueue->remove(task);
+    this->kernel->tryTerminate();
+    return removed;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::purge() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::beforeExecute(Thread* thread DECAF_UNUSED, Runnable* task DECAF_UNUSED) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::afterExecute(Runnable* task DECAF_UNUSED, decaf::lang::Throwable* error DECAF_UNUSED) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutor::terminated() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize,
                                int maxPoolSize, long long keepAliveTime,
-                               BlockingQueue<decaf::lang::Runnable*>* workQueue) :
+                               BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                               ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
     parent(parent),
     workers(),
     deadWorkers(),
@@ -353,11 +666,14 @@ ExecutorKernel::ExecutorKernel(ThreadPoo
     maxPoolSize(maxPoolSize),
     corePoolSize(corePoolSize),
     keepAliveTime(keepAliveTime),
-    workQueue(workQueue),
+    coreThreadsCanTimeout(false),
+    workQueue(),
     mainLock(),
     termination(1),
     completedTasks(0),
-    largestPoolSize(0) {
+    largestPoolSize(0),
+    factory(),
+    rejectionHandler() {
 
     if(corePoolSize < 0 || maxPoolSize <= 0 ||
        maxPoolSize < corePoolSize || keepAliveTime < 0) {
@@ -365,12 +681,16 @@ ExecutorKernel::ExecutorKernel(ThreadPoo
         throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range.");
     }
 
-    if(workQueue == NULL) {
-        throw NullPointerException(__FILE__, __LINE__, "BlockingQueue pointer was null");
+    if(workQueue == NULL || threadFactory == NULL || handler == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Required parameter was NULL");
     }
 
     this->cleanupTimer.scheduleAtFixedRate(
         new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
+
+    this->workQueue.reset(workQueue);
+    this->factory.reset(threadFactory);
+    this->rejectionHandler.reset(handler);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -414,7 +734,7 @@ void ExecutorKernel::onTaskStarted(Worke
             // cause the number of Task to exceed the number of free threads
             // once the Threads got a chance to wake up and service the queue
             if( freeThreads.get() == 0 && !workQueue->isEmpty() ) {
-                AllocateThread();
+                addWorker();
             }
         }
     }
@@ -470,13 +790,13 @@ void ExecutorKernel::enQueueTask(Runnabl
             // If there's nobody open to do work, then create some more
             // threads to handle the work.
             if( this->freeThreads.get() == 0 ) {
-                AllocateThread();
+                addWorker();
             }
-        }
 
-        // queue the new work.
-        if(!this->workQueue->offer(task)) {
-            throw RejectedExecutionException(__FILE__, __LINE__, "Task Rejected by work Q");
+            // queue the new work.
+            if(isStoppedOrStopping() || !this->workQueue->offer(task)) {
+                this->rejectionHandler->rejectedExecution(task, this->parent);
+            }
         }
     }
     DECAF_CATCH_RETHROW( Exception )
@@ -490,24 +810,19 @@ Runnable* ExecutorKernel::deQueueTask() 
 
         Runnable* task = NULL;
 
-        // Wait for work, wait in a while loop since another thread could
-        // be waiting for a lock and get the work before we get woken up
-        // from our wait.
-        while( !isStoppedOrStopping() ) {
+        while(true) {
 
-            // TODO - Threads aren't interruptible yet.
+            // TODO - Threads aren't interruptible yet, so spin wait.
             if(workQueue->poll(task, 10, TimeUnit::MILLISECONDS)) {
                 break;
             }
-        }
-
-        // Don't give more work if we are closing down
-        if(isStoppedOrStopping()) {
 
-            if(task != NULL) {
-                delete task;
+            if(isStoppedOrStopping() && workQueue->isEmpty()) {
+                break;
             }
+        }
 
+        if(isStoppedOrStopping() && task == NULL) {
             return NULL;
         }
 
@@ -523,12 +838,12 @@ Runnable* ExecutorKernel::deQueueTask() 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::AllocateThread() {
+bool ExecutorKernel::addWorker() {
 
     try{
 
         if( this->workers.size() >= this->maxPoolSize ) {
-            return;
+            return false;
         }
 
         synchronized( &mainLock ) {
@@ -538,6 +853,38 @@ void ExecutorKernel::AllocateThread() {
             newWorker->start();
             this->largestPoolSize++;
         }
+
+        return true;
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ExecutorKernel::addAllWorkers() {
+
+    try{
+
+        if( this->workers.size() >= this->maxPoolSize ) {
+            return 0;
+        }
+
+        int delta = 0;
+
+        synchronized( &mainLock ) {
+
+            delta = this->maxPoolSize - this->workers.size();
+
+            for(int i = 0; i < delta; ++i) {
+                Worker* newWorker = new Worker(this);
+                this->workers.add(newWorker);
+                freeThreads.incrementAndGet();
+                newWorker->start();
+                this->largestPoolSize++;
+            }
+        }
+
+        return delta;
     }
     DECAF_CATCH_RETHROW( lang::Exception )
     DECAF_CATCHALL_THROW( lang::Exception )
@@ -563,6 +910,29 @@ void ExecutorKernel::shutdown() {
 
         synchronized(&mainLock) {
 
+            // TODO - When threads are interruptible, we need to interrupt the Queue.
+            //synchronized( workQueue.get() ) {
+            //    // Signal the Queue so that all waiters are notified
+            //    workQueue->notifyAll();
+            //}
+        }
+
+        this->tryTerminate();
+        this->stopped.set(true);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::shutdownNow(ArrayList<Runnable*>& unexecutedTasks) {
+
+    if(isStoppedOrStopping()) {
+        return;
+    }
+
+    if(this->stopping.compareAndSet(false, true)) {
+
+        synchronized(&mainLock) {
+
             Pointer< Iterator<Worker*> > iter(this->workers.iterator());
 
             while(iter->hasNext()) {
@@ -574,13 +944,38 @@ void ExecutorKernel::shutdown() {
             //    // Signal the Queue so that all waiters are notified
             //    workQueue->notifyAll();
             //}
+
+            this->drainQueue(unexecutedTasks);
         }
 
+        this->tryTerminate();
         this->stopped.set(true);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::drainQueue(ArrayList<Runnable*>& unexecutedTasks) {
+
+    // Some Queue implementations can fail in poll and drainTo so we check
+    // after attempting to drain the Queue and if its not empty we remove
+    // the tasks one by one.
+
+    this->workQueue->drainTo(unexecutedTasks);
+    if (!this->workQueue->isEmpty()) {
+
+        std::vector<Runnable*> tasks = this->workQueue->toArray();
+        std::vector<Runnable*>::iterator iter = tasks.begin();
+
+        for (; iter != tasks.end(); ++iter) {
+
+            if (this->workQueue->remove(*iter)) {
+                unexecutedTasks.add(*iter);
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ExecutorKernel::awaitTermination(long long timeout, const TimeUnit& unit) {
 
     if (this->terminated.get() == true) {



Mime
View raw message