Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 20858 invoked from network); 11 Apr 2011 20:23:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Apr 2011 20:23:13 -0000 Received: (qmail 93515 invoked by uid 500); 11 Apr 2011 20:23:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 93451 invoked by uid 500); 11 Apr 2011 20:23:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 93444 invoked by uid 99); 11 Apr 2011 20:23:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2011 20:23:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2011 20:23:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 084E82388994; Mon, 11 Apr 2011 20:22:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1091195 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/decaf/lang/ main/decaf/util/ main/decaf/util/concurrent/ main/decaf/util/concurrent/locks/ test/ test/decaf/util/concurrent/ Date: Mon, 11 Apr 2011 20:22:40 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110411202241.084E82388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Mon Apr 11 20:22:39 2011 New Revision: 1091195 URL: http://svn.apache.org/viewvc?rev=1091195&view=rev Log: Flush out some more of the Threading API and test what's working now. Cleans up some other API code for the next release. Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h (with props) Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Apr 11 20:22:39 2011 @@ -432,6 +432,7 @@ cc_sources = \ decaf/util/Timer.cpp \ decaf/util/TimerTask.cpp \ decaf/util/UUID.cpp \ + decaf/util/concurrent/AbstractExecutorService.cpp \ decaf/util/concurrent/BlockingQueue.cpp \ decaf/util/concurrent/BrokenBarrierException.cpp \ decaf/util/concurrent/Callable.cpp \ @@ -443,10 +444,12 @@ cc_sources = \ decaf/util/concurrent/Delayed.cpp \ decaf/util/concurrent/Executor.cpp \ decaf/util/concurrent/ExecutorService.cpp \ + decaf/util/concurrent/Executors.cpp \ decaf/util/concurrent/Future.cpp \ decaf/util/concurrent/LinkedBlockingQueue.cpp \ decaf/util/concurrent/Lock.cpp \ decaf/util/concurrent/Mutex.cpp \ + decaf/util/concurrent/RejectedExecutionHandler.cpp \ decaf/util/concurrent/Semaphore.cpp \ decaf/util/concurrent/SynchronousQueue.cpp \ decaf/util/concurrent/ThreadFactory.cpp \ @@ -456,6 +459,7 @@ cc_sources = \ decaf/util/concurrent/atomic/AtomicInteger.cpp \ decaf/util/concurrent/atomic/AtomicRefCounter.cpp \ decaf/util/concurrent/atomic/AtomicReference.cpp \ + decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp \ decaf/util/concurrent/locks/LockSupport.cpp \ decaf/util/concurrent/locks/ReentrantLock.cpp \ decaf/util/logging/ConsoleHandler.cpp \ @@ -1019,6 +1023,7 @@ h_sources = \ decaf/util/TimerTask.h \ decaf/util/UUID.h \ decaf/util/comparators/Less.h \ + decaf/util/concurrent/AbstractExecutorService.h \ decaf/util/concurrent/BlockingQueue.h \ decaf/util/concurrent/BrokenBarrierException.h \ decaf/util/concurrent/Callable.h \ @@ -1033,6 +1038,7 @@ h_sources = \ decaf/util/concurrent/ExecutionException.h \ decaf/util/concurrent/Executor.h \ decaf/util/concurrent/ExecutorService.h \ + decaf/util/concurrent/Executors.h \ decaf/util/concurrent/Future.h \ decaf/util/concurrent/LinkedBlockingQueue.h \ decaf/util/concurrent/Lock.h \ @@ -1050,6 +1056,7 @@ h_sources = \ decaf/util/concurrent/atomic/AtomicInteger.h \ decaf/util/concurrent/atomic/AtomicRefCounter.h \ decaf/util/concurrent/atomic/AtomicReference.h \ + decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h \ decaf/util/concurrent/locks/Condition.h \ decaf/util/concurrent/locks/Lock.h \ decaf/util/concurrent/locks/LockSupport.h \ Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp Mon Apr 11 20:22:39 2011 @@ -20,6 +20,12 @@ #include #include #include +#include +#include +#include +#include +#include +#include using namespace std; using namespace decaf; @@ -69,6 +75,60 @@ String::String( const std::string& sourc } //////////////////////////////////////////////////////////////////////////////// +String::String(const char* array, int size) : contents(new Contents) { + + if( size < 0 ) { + throw IndexOutOfBoundsException( + __FILE__, __LINE__, "size parameter out of Bounds: %d.", size ); + } + + if( array == NULL ) { + throw NullPointerException( + __FILE__, __LINE__, "Buffer pointer passed was NULL." ); + } + + if(size > 0) { + + this->contents->value = ArrayPointer(size); + this->contents->length = size; + + System::arraycopy( (unsigned char*)array, 0, contents->value.get(), 0, size ); + } +} + +//////////////////////////////////////////////////////////////////////////////// +String::String(const char* array, int size, int offset, int length) : contents(new Contents) { + + if( size < 0 ) { + throw IndexOutOfBoundsException( + __FILE__, __LINE__, "size parameter out of Bounds: %d.", size ); + } + + if( offset > size || offset < 0 ) { + throw IndexOutOfBoundsException( + __FILE__, __LINE__, "offset parameter out of Bounds: %d.", offset ); + } + + if( length < 0 || length > size - offset ) { + throw IndexOutOfBoundsException( + __FILE__, __LINE__, "length parameter out of Bounds: %d.", length ); + } + + if( array == NULL ) { + throw NullPointerException( + __FILE__, __LINE__, "Buffer pointer passed was NULL." ); + } + + if(size > 0) { + + this->contents->value = ArrayPointer(length); + this->contents->length = length; + + System::arraycopy( (unsigned char*)array, offset, contents->value.get(), 0, length ); + } +} + +//////////////////////////////////////////////////////////////////////////////// String::~String() { try{ delete this->contents; @@ -133,3 +193,43 @@ std::string String::toString() const { return std::string( (const char*)this->contents->value.get(), this->length() ); } + +//////////////////////////////////////////////////////////////////////////////// +String String::valueOf(bool value) { + + if(value) { + return String("true"); + } + + return String("false"); +} + +//////////////////////////////////////////////////////////////////////////////// +String String::valueOf(char value) { + return String( &value, 1 ); +} + +//////////////////////////////////////////////////////////////////////////////// +String String::valueOf(float value) { + return String( Float::toString(value) ); +} + +//////////////////////////////////////////////////////////////////////////////// +String String::valueOf(double value) { + return String( Double::toString(value) ); +} + +//////////////////////////////////////////////////////////////////////////////// +String String::valueOf(short value) { + return String( Short::toString(value) ); +} + +//////////////////////////////////////////////////////////////////////////////// +String String::valueOf(int value) { + return String( Integer::toString(value) ); +} + +//////////////////////////////////////////////////////////////////////////////// +String String::valueOf(long long value) { + return String( Long::toString(value) ); +} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h Mon Apr 11 20:22:39 2011 @@ -63,6 +63,43 @@ namespace lang { */ String( const std::string& source ); + /** + * Create a new String object that represents the given array of characters. The method + * takes the size of the array as a parameter to allow for strings that are not NULL + * terminated, the caller can pass strlen(array) in the case where the array is properly + * NULL terminated. + * + * @param array + * The character buffer to copy into this new String object. + * @param size + * The size of the string buffer given, in case the string is not NULL terminated. + * + * @throws NullPointerException if the character array parameter is NULL. + * @throws IndexOutOfBoundsException if the size parameter is negative. + */ + String( const char* array, int size ); + + /** + * Create a new String object that represents the given array of characters. The method + * takes the size of the array as a parameter to allow for strings that are not NULL + * terminated, the caller can pass strlen(array) in the case where the array is properly + * NULL terminated. + * + * @param array + * The character buffer to copy into this new String object. + * @param size + * The size of the string buffer given, in case the string is not NULL terminated. + * @param offset + * The position to start copying from in the given buffer. + * @param length + * The number of bytes to copy from the given buffer starting from the offset. + * + * @throws NullPointerException if the character array parameter is NULL. + * @throws IndexOutOfBoundsException if the size, offset or length parameter is negative + * or if the length to copy is greater than the span of size - offset. + */ + String( const char* array, int size, int offset, int length ); + virtual ~String(); public: @@ -100,6 +137,78 @@ namespace lang { */ virtual std::string toString() const; + public: // Static methods. + + /** + * Returns a String that represents the value of the given boolean value. + * + * @param value + * The value whose string representation is to be returned. + * + * @returns "true" if the boolean is true, "false" otherwise. + */ + static String valueOf(bool value); + + /** + * Returns a String that represents the value of the given char value. + * + * @param value + * The value whose string representation is to be returned. + * + * @returns a String that contains the single character value given. + */ + static String valueOf(char value); + + /** + * Returns a String that represents the value of the given float value. + * + * @param value + * The value whose string representation is to be returned. + * + * @returns a String that contains the string representation of the float value given. + */ + static String valueOf(float value); + + /** + * Returns a String that represents the value of the given double value. + * + * @param value + * The value whose string representation is to be returned. + * + * @returns a String that contains the string representation of the double value given. + */ + static String valueOf(double value); + + /** + * Returns a String that represents the value of the given short value. + * + * @param value + * The value whose string representation is to be returned. + * + * @returns a String that contains the string representation of the short value given. + */ + static String valueOf(short value); + + /** + * Returns a String that represents the value of the given integer value. + * + * @param value + * The value whose string representation is to be returned. + * + * @returns a String that contains the string representation of the integer value given. + */ + static String valueOf(int value); + + /** + * Returns a String that represents the value of the given 64bit long value. + * + * @param value + * The value whose string representation is to be returned. + * + * @returns a String that contains the string representation of the 64 bit long value given. + */ + static String valueOf(long long value); + }; }} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp Mon Apr 11 20:22:39 2011 @@ -28,8 +28,10 @@ #include #include #include +#include #include #include +#include #include @@ -231,11 +233,17 @@ void Thread::initThreading() { // We mark the thread where Decaf's Init routine is called from as our Main Thread. mainThread = Thread::createForeignThreadInstance( "Main Thread" ); + + // Initialize the Executors static data for use in ExecutorService classes. + Executors::initialize(); } //////////////////////////////////////////////////////////////////////////////// void Thread::shutdownThreading() { + // First shutdown the Executors static data to remove dependencies on Threading. + Executors::shutdown(); + // Clear the Main Thread instance pointer, this indicates we are Shutdown. mainThread = NULL; @@ -579,6 +587,21 @@ int Thread::getPriority() const { } //////////////////////////////////////////////////////////////////////////////// +bool Thread::isDaemon() const { + return false; +} + +//////////////////////////////////////////////////////////////////////////////// +void Thread::setDaemon(bool value DECAF_UNUSED) { + + if(this->properties->state > Thread::NEW) { + throw IllegalThreadStateException(__FILE__, __LINE__, "Thread is already active."); + } + + // TODO - Set thread to detached or joinable as indicated by the value arg. +} + +//////////////////////////////////////////////////////////////////////////////// void Thread::setUncaughtExceptionHandler( UncaughtExceptionHandler* handler ) { this->properties->exHandler = handler; } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h Mon Apr 11 20:22:39 2011 @@ -263,6 +263,25 @@ namespace lang{ void setPriority( int value ); /** + * Sets if the given Thread is a Daemon Thread or not. Daemon threads cannot be + * joined and its resource are automatically reclaimed when it terminates. + * + * @param value + * Boolean indicating if this thread should be a daemon thread or not. + * + * @throws IllegalThreadStateException if the thread is already active. + */ + void setDaemon(bool value); + + /** + * Returns whether this thread is a daemon thread or not, if true this thread cannot + * be joined. + * + * @return true if the thread is a daemon thread. + */ + bool isDaemon() const; + + /** * Set the handler invoked when this thread abruptly terminates due to an uncaught exception. * * @returns a pointer to the set UncaughtExceptionHandler. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h Mon Apr 11 20:22:39 2011 @@ -85,6 +85,8 @@ namespace util { return AbstractCollection::addAll( collection ); } + using AbstractCollection::remove; + /** * {@inheritDoc} * Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp Mon Apr 11 20:22:39 2011 @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AbstractExecutorService.h" + +using namespace decaf; +using namespace decaf::util; +using namespace decaf::util::concurrent; +using namespace decaf::lang; + +//////////////////////////////////////////////////////////////////////////////// +AbstractExecutorService::AbstractExecutorService() : ExecutorService() { +} + +//////////////////////////////////////////////////////////////////////////////// +AbstractExecutorService::~AbstractExecutorService() { +} Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h Mon Apr 11 20:22:39 2011 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_ +#define _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_ + +#include + +#include +#include + +namespace decaf { +namespace util { +namespace concurrent { + + /** + * Provides a default implementation for the methods of the ExecutorService + * interface. Use this class as a starting point for implementations of custom + * executor service implementations. + * + * @since 1.0 + */ + class DECAF_API AbstractExecutorService : public ExecutorService { + public: + + AbstractExecutorService(); + virtual ~AbstractExecutorService(); + + }; + +}}} + +#endif /* _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_ */ Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h Mon Apr 11 20:22:39 2011 @@ -167,6 +167,9 @@ namespace concurrent { virtual ~BlockingQueue() { } + using Queue::offer; + using Queue::poll; + /** * Inserts the specified element into this queue, waiting if necessary for space * to become available. @@ -179,8 +182,6 @@ namespace concurrent { */ virtual void put( const E& value ) = 0; - using Queue::offer; - /** * Inserts the specified element into this queue, waiting up to the specified wait * time if necessary for space to become available. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h Mon Apr 11 20:22:39 2011 @@ -20,6 +20,8 @@ #include +#include +#include #include #include #include @@ -55,20 +57,55 @@ namespace concurrent { virtual ~ExecutorService() {} /** - * Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, - * or the current thread is interrupted, whichever happens first. + * The caller will block until the executor has completed termination meaning all tasks + * that where scheduled before shutdown have now completed and the executor is ready for + * deletion. If the timeout period elapses before the executor reaches the terminated + * state then this method return false to indicate it has not terminated. * * @param timeout - * The amount of time to wait before timing out the Wait operation. + * The amount of time to wait before abandoning the wait for termination. * @param unit - * The Units that comprise the timeout value. + * The unit of time that the timeout value represents. * - * @returns true if the executer terminated before the given timeout value elapsed. + * @return true if the executor terminated or false if the timeout expired. * - * @throws InterruptedException - if interrupted while waiting. + * @throws InterruptedException if this call is interrupted while awaiting termination. */ virtual bool awaitTermination( long long timeout, const TimeUnit& unit ) = 0; + /** + * Performs an orderly shutdown of this Executor. Previously queued tasks are allowed + * to complete but no new tasks are accepted for execution. Calling this method more + * than once has no affect on this executor. + */ + virtual void shutdown() = 0; + + /** + * Attempts to stop all currently executing tasks and returns an ArrayList containing the + * Runnables that did not get executed, these object become the property of the caller and + * are not deleted by this class, they are removed from the work queue and forgotten about. + * + * There is no guarantee that this method will halt execution of currently executing tasks. + * + * @return an ArrayList containing all Runnable instance that were still waiting to be + * executed by this class, call now owns those pointers. + */ + virtual ArrayList shutdownNow() = 0; + + /** + * Returns whether this executor has been shutdown or not. + * + * @return true if this executor has been shutdown. + */ + virtual bool isShutdown() const = 0; + + /** + * Returns whether all tasks have completed after this executor was shut down. + * + * @return true if all tasks have completed after a request to shut down was made. + */ + virtual bool isTerminated() const = 0; + }; }}} Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp Mon Apr 11 20:22:39 2011 @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Executors.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace decaf; +using namespace decaf::util; +using namespace decaf::util::concurrent; +using namespace decaf::util::concurrent::atomic; +using namespace decaf::lang; +using namespace decaf::lang::exceptions; + +//////////////////////////////////////////////////////////////////////////////// +namespace { + + class DefaultThreadFactory : public ThreadFactory { + public: + + static AtomicInteger* poolNumber; + + private: + + //ThreadGroup group; + AtomicInteger threadNumber; + std::string namePrefix; + + public: + + DefaultThreadFactory() : ThreadFactory(), threadNumber(1), namePrefix() { + + if(DefaultThreadFactory::poolNumber == NULL) { + throw NullPointerException(); + } + + namePrefix = std::string("pool-") + + Integer::toString(poolNumber->getAndIncrement()) + + "-thread-"; + } + + Thread* newThread(Runnable* task) { + Thread* thread = new Thread(task, namePrefix + Integer::toString(threadNumber.getAndIncrement())); + + if (thread->isDaemon()) { + thread->setDaemon(false); + } + + if (thread->getPriority() != Thread::NORM_PRIORITY) { + thread->setPriority(Thread::NORM_PRIORITY); + } + + return thread; + } + }; + + AtomicInteger* DefaultThreadFactory::poolNumber = NULL; +} + +//////////////////////////////////////////////////////////////////////////////// +Executors::Executors() { +} + +//////////////////////////////////////////////////////////////////////////////// +Executors::~Executors() { +} + +//////////////////////////////////////////////////////////////////////////////// +void Executors::initialize() { + DefaultThreadFactory::poolNumber = new AtomicInteger(1); +} + +//////////////////////////////////////////////////////////////////////////////// +void Executors::shutdown() { + delete DefaultThreadFactory::poolNumber; +} + +//////////////////////////////////////////////////////////////////////////////// +ThreadFactory* Executors::getDefaultThreadFactory() { + return new DefaultThreadFactory(); +} + +//////////////////////////////////////////////////////////////////////////////// +ExecutorService* Executors::newFixedThreadPool(int nThreads) { + + Pointer< BlockingQueue > backingQ; + + try{ + + backingQ.reset(new LinkedBlockingQueue()); + ExecutorService* service = new ThreadPoolExecutor( + nThreads, nThreads, 0, TimeUnit::MILLISECONDS, backingQ.get()); + + backingQ.release(); + + return service; + + } catch(NullPointerException& ex) { + ex.setMark(__FILE__, __LINE__); + throw ex; + } catch(IllegalArgumentException& ex) { + ex.setMark(__FILE__, __LINE__); + throw ex; + } catch(Exception& ex) { + ex.setMark(__FILE__, __LINE__); + throw ex; + } catch(...) { + throw Exception(); + } +} + +//////////////////////////////////////////////////////////////////////////////// +ExecutorService* Executors::newFixedThreadPool(int nThreads, ThreadFactory* threadFactory) { + + Pointer< BlockingQueue > backingQ; + + try{ + + backingQ.reset(new LinkedBlockingQueue()); + ExecutorService* service = new ThreadPoolExecutor( + nThreads, nThreads, 0, TimeUnit::MILLISECONDS, backingQ.get(), threadFactory); + + backingQ.release(); + + return service; + + } catch(NullPointerException& ex) { + ex.setMark(__FILE__, __LINE__); + throw ex; + } catch(IllegalArgumentException& ex) { + ex.setMark(__FILE__, __LINE__); + throw ex; + } catch(Exception& ex) { + ex.setMark(__FILE__, __LINE__); + throw ex; + } catch(...) { + throw Exception(); + } +} Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h Mon Apr 11 20:22:39 2011 @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _DECAF_UTIL_CONCURRENT_EXECUTORS_H_ +#define _DECAF_UTIL_CONCURRENT_EXECUTORS_H_ + +#include + +#include +#include +#include + +namespace decaf { +namespace util { +namespace concurrent { + + /** + * Implements a set of utilities for use with Executors, ExecutorService, ThreadFactory, + * and Callable types, as well as providing factory methods for instance of these + * types configured for the most common use cases. + * + * @since 1.0 + */ + class DECAF_API Executors { + private: + + Executors(); + Executors(const Executors&); + Executors& operator= (const Executors&); + + public: + + virtual ~Executors(); + + /** + * Creates and returns a new ThreadFactory that expresses the default behavior for + * ThreadFactories used in Executor classes. The default factory create a new + * non-daemon thread with normal priority and a name whose value is equal to + * pool-N-thread-M, where N is the sequence number of this factory, and M is the + * sequence number of the thread created by this factory. + * + * @returns a new instance of the default thread factory used in Executors, the + * caller takes ownership of the returned pointer. + */ + static ThreadFactory* getDefaultThreadFactory(); + + /** + * Creates a new ThreadPoolExecutor with a fixed number of threads to process incoming + * tasks. The thread pool will use an unbounded queue to store pending tasks. At any + * given time the maximum threads in the pool will be equal to the number given to this + * factory method. If a thread in the pool dies a new one will be spawned to take its + * place in the pool. Tasks that are submitted when all pooled threads are busy will + * be held until a thread is freed if the pool has allocated its assigned number of + * threads already. + * + * @param nThreads + * The number of threads to assign as the max for the new ExecutorService. + * + * @returns pointer to a new ExecutorService that is owned by the caller. + * + * @throws IllegalArgumentException if nThreads is less than or equal to zero. + */ + static ExecutorService* newFixedThreadPool(int nThreads); + + /** + * Creates a new ThreadPoolExecutor with a fixed number of threads to process incoming + * tasks. The thread pool will use an unbounded queue to store pending tasks. At any + * given time the maximum threads in the pool will be equal to the number given to this + * factory method. If a thread in the pool dies a new one will be spawned to take its + * place in the pool. Tasks that are submitted when all pooled threads are busy will + * be held until a thread is freed if the pool has allocated its assigned number of + * threads already. + * + * @param nThreads + * The number of threads to assign as the max for the new ExecutorService. + * @param threadFactory + * Instance of a ThreadFactory that will be used by the Executor to spawn new + * worker threads. This parameter cannot be NULL. + * + * @returns pointer to a new ExecutorService that is owned by the caller. + * + * @throws NullPointerException if threadFactory is NULL. + * @throws IllegalArgumentException if nThreads is less than or equal to zero. + */ + static ExecutorService* newFixedThreadPool(int nThreads, ThreadFactory* threadFactory); + + private: + + static void initialize(); + static void shutdown(); + + friend class decaf::lang::Thread; + + }; + +}}} + +#endif /* _DECAF_UTIL_CONCURRENT_EXECUTORS_H_ */ Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp?rev=1091195&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp Mon Apr 11 20:22:39 2011 @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RejectedExecutionHandler.h" + +using namespace decaf; +using namespace decaf::lang; +using namespace decaf::util; +using namespace decaf::util::concurrent; + +//////////////////////////////////////////////////////////////////////////////// +RejectedExecutionHandler::RejectedExecutionHandler() { + +} + +//////////////////////////////////////////////////////////////////////////////// +RejectedExecutionHandler::~RejectedExecutionHandler() { + +} + Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h Mon Apr 11 20:22:39 2011 @@ -36,7 +36,8 @@ namespace concurrent { class DECAF_API RejectedExecutionHandler { public: - virtual ~RejectedExecutionHandler() {} + RejectedExecutionHandler(); + virtual ~RejectedExecutionHandler(); /** * Method that may be invoked by a {@link ThreadPoolExecutor} when @@ -56,8 +57,7 @@ namespace concurrent { * * @throws RejectedExecutionException if there is no remedy. */ - virtual void rejectedExecution( Runnable* r, ThreadPoolExecutor* executer ) - throw( RejectedExecutionException ) = 0; + virtual void rejectedExecution( decaf::lang::Runnable* r, ThreadPoolExecutor* executer ) = 0; }; Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h Mon Apr 11 20:22:39 2011 @@ -20,11 +20,10 @@ #include +#include +#include + namespace decaf { -namespace lang { - class Thread; - class Runnable; -} namespace util { namespace concurrent { Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Mon Apr 11 20:22:39 2011 @@ -14,19 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include -#include -#include + +#include "ThreadPoolExecutor.h" + +#include +#include #include #include +#include +#include #include #include -#include -#include #include -#include -#include +#include +#include #include +#include +#include +#include #include #include @@ -63,6 +68,7 @@ namespace concurrent{ int maxPoolSize; int corePoolSize; long long keepAliveTime; + bool coreThreadsCanTimeout; Pointer< BlockingQueue > workQueue; Mutex mainLock; CountDownLatch termination; @@ -70,11 +76,15 @@ namespace concurrent{ long long completedTasks; int largestPoolSize; + Pointer factory; + Pointer rejectionHandler; + public: ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize, int maxPoolSize, long long keepAliveTime, - BlockingQueue* workQueue); + BlockingQueue* workQueue, + ThreadFactory* threadFactory, RejectedExecutionHandler* handler); ~ExecutorKernel(); @@ -88,18 +98,24 @@ namespace concurrent{ Runnable* deQueueTask(); - void AllocateThread(); + bool addWorker(); + + int addAllWorkers(); bool isStoppedOrStopping(); void shutdown(); + void shutdownNow(ArrayList& unexecutedTasks); + bool awaitTermination(long long timeout, const TimeUnit& unit); void handleWorkerExit(Worker* worker); void tryTerminate(); + void drainQueue(ArrayList& unexecutedTasks); + }; class Worker : public lang::Thread { @@ -137,6 +153,11 @@ namespace concurrent{ Runnable* task = this->kernel->deQueueTask(); if(this->done) { + + if(task != NULL) { + delete task; + } + break; } @@ -216,7 +237,122 @@ namespace concurrent{ ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize, long long keepAliveTime, const TimeUnit& unit, BlockingQueue* workQueue) : - kernel(new ExecutorKernel(this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) { + AbstractExecutorService(), + kernel(NULL) { + + try{ + + if(workQueue == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL."); + } + + Pointer handler(new ThreadPoolExecutor::AbortPolicy()); + Pointer threadFactory(Executors::getDefaultThreadFactory()); + + this->kernel = new ExecutorKernel( + this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue, + threadFactory.get(), handler.get()); + + handler.release(); + threadFactory.release(); + } + DECAF_CATCH_RETHROW(NullPointerException) + DECAF_CATCH_RETHROW(IllegalArgumentException) + DECAF_CATCH_RETHROW(Exception) + DECAF_CATCHALL_THROW(Exception) +} + +//////////////////////////////////////////////////////////////////////////////// +ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize, + long long keepAliveTime, const TimeUnit& unit, + BlockingQueue* workQueue, + RejectedExecutionHandler* handler) : + AbstractExecutorService(), + kernel(NULL) { + + try{ + + if(workQueue == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL."); + } + + if(handler == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The RejectedExecutionHandler pointer cannot be NULL."); + } + + Pointer threadFactory(Executors::getDefaultThreadFactory()); + + this->kernel = new ExecutorKernel( + this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue, + threadFactory.get(), handler); + + threadFactory.release(); + } + DECAF_CATCH_RETHROW(NullPointerException) + DECAF_CATCH_RETHROW(Exception) + DECAF_CATCHALL_THROW(Exception) +} + +//////////////////////////////////////////////////////////////////////////////// +ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize, + long long keepAliveTime, const TimeUnit& unit, + BlockingQueue* workQueue, + ThreadFactory* threadFactory) : + AbstractExecutorService(), + kernel(NULL) { + + try{ + + if(workQueue == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL."); + } + + if(threadFactory == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The ThreadFactory pointer cannot be NULL."); + } + + Pointer handler(new ThreadPoolExecutor::AbortPolicy()); + + this->kernel = new ExecutorKernel( + this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue, + threadFactory, handler.get()); + + handler.release(); + } + DECAF_CATCH_RETHROW(NullPointerException) + DECAF_CATCH_RETHROW(Exception) + DECAF_CATCHALL_THROW(Exception) +} + +//////////////////////////////////////////////////////////////////////////////// +ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize, + long long keepAliveTime, const TimeUnit& unit, + BlockingQueue* workQueue, + ThreadFactory* threadFactory, RejectedExecutionHandler* handler) : + AbstractExecutorService(), + kernel(NULL) { + + try{ + + if(workQueue == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL."); + } + + if(handler == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The RejectedExecutionHandler pointer cannot be NULL."); + } + + if(threadFactory == NULL) { + throw NullPointerException(__FILE__, __LINE__, "The ThreadFactory pointer cannot be NULL."); + } + + this->kernel = new ExecutorKernel( + this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue, + threadFactory, handler); + } + DECAF_CATCH_RETHROW(NullPointerException) + DECAF_CATCH_RETHROW(Exception) + DECAF_CATCHALL_THROW(Exception) } //////////////////////////////////////////////////////////////////////////////// @@ -225,7 +361,7 @@ ThreadPoolExecutor::~ThreadPoolExecutor( try{ delete kernel; } - DECAF_CATCH_NOTHROW( lang::Exception ) + DECAF_CATCH_NOTHROW(Exception) DECAF_CATCHALL_NOTHROW() } @@ -257,6 +393,19 @@ void ThreadPoolExecutor::shutdown() { } //////////////////////////////////////////////////////////////////////////////// +ArrayList ThreadPoolExecutor::shutdownNow() { + + ArrayList result; + + try{ + this->kernel->shutdownNow(result); + return result; + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// bool ThreadPoolExecutor::awaitTermination(long long timeout, const TimeUnit& unit) { try{ @@ -282,11 +431,53 @@ int ThreadPoolExecutor::getCorePoolSize( } //////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::setCorePoolSize(int poolSize) { + + if (poolSize < 0) { + throw IllegalArgumentException(__FILE__, __LINE__, "Pool size given was negative."); + } + + synchronized(&this->kernel->mainLock) { + + //int delta = poolSize - this->kernel->corePoolSize; + this->kernel->corePoolSize = poolSize; + + if (this->kernel->workers.size() > poolSize) { + // TODO - Once Threads are interruptible wake them up so some can terminate. + } else { + + // TODO - Create new threads up to the new pool size, unless we are out + // of work or run out while creating. +// int target = Math::min(delta, this->kernel->workQueue->size()); +// while (target-- > 0 && addWorker(NULL, true)) { +// if (this->kernel->workQueue->isEmpty()) { +// break; +// } +// } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// int ThreadPoolExecutor::getMaximumPoolSize() const { return this->kernel->maxPoolSize; } //////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::setMaximumPoolSize(int maxSize) { + + if (maxSize < 0 || maxSize < this->kernel->corePoolSize) { + throw IllegalArgumentException(__FILE__, __LINE__, "Size given was invalid."); + } + + this->kernel->maxPoolSize = maxSize; + + if (this->kernel->workers.size() > maxSize) { + // TODO - Wake idle worker threads when able to. + } +} + +//////////////////////////////////////////////////////////////////////////////// long long ThreadPoolExecutor::getTaskCount() const { return this->kernel->workQueue->size(); } @@ -325,6 +516,47 @@ int ThreadPoolExecutor::getLargestPoolSi } //////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::setThreadFactory(ThreadFactory* factory) { + + if (factory == NULL) { + throw NullPointerException(__FILE__, __LINE__, "Cannot assign a NULL ThreadFactory."); + } + + if (factory != this->kernel->factory) { + Pointer temp(factory); + this->kernel->factory.swap(temp); + } +} + +//////////////////////////////////////////////////////////////////////////////// +ThreadFactory* ThreadPoolExecutor::getThreadFactory() const { + return this->kernel->factory.get(); +} + +//////////////////////////////////////////////////////////////////////////////// +RejectedExecutionHandler* ThreadPoolExecutor::getRejectedExecutionHandler() const { + return this->kernel->rejectionHandler.get(); +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::setRejectedExecutionHandler(RejectedExecutionHandler* handler) { + + if (handler == NULL) { + throw NullPointerException(__FILE__, __LINE__, "Cannot assign a NULL ThreadFactory."); + } + + if (handler != this->kernel->rejectionHandler) { + Pointer temp(handler); + this->kernel->rejectionHandler.swap(temp); + } +} + +//////////////////////////////////////////////////////////////////////////////// +BlockingQueue* ThreadPoolExecutor::getQueue() { + return this->kernel->workQueue.get(); +} + +//////////////////////////////////////////////////////////////////////////////// bool ThreadPoolExecutor::isShutdown() const { return this->kernel->stopped.get(); } @@ -335,13 +567,94 @@ bool ThreadPoolExecutor::isTerminated() } //////////////////////////////////////////////////////////////////////////////// +bool ThreadPoolExecutor::isTerminating() const { + return this->kernel->isStoppedOrStopping() && !this->kernel->terminated.get(); +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::allowCoreThreadTimeout(bool value) { + + if (value == true && this->kernel->keepAliveTime == 0) { + throw IllegalArgumentException(__FILE__, __LINE__, + "Keep Alive Time must be set to a non-zero value to enable this option."); + } + + if (value != this->kernel->coreThreadsCanTimeout) { + this->kernel->coreThreadsCanTimeout = value; + if (value == true) { + // TODO - When Threads are interruptible wake works so they can check timeout. + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +long long ThreadPoolExecutor::getKeepAliveTime(const TimeUnit& unit) const { + return unit.convert(this->kernel->keepAliveTime, TimeUnit::MILLISECONDS); +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::setKeepAliveTime(long long timeout, const TimeUnit& unit) { + + if (timeout < 0) { + throw IllegalArgumentException(__FILE__, __LINE__, "Timeout value cannot be negative."); + } + + if (this->kernel->coreThreadsCanTimeout == true && unit.toMillis(timeout) == 0) { + throw IllegalArgumentException(__FILE__, __LINE__, + "Keep Alive Time must be set to a non-zero value when allowCoreThreadsTimeout is enabled."); + } + + long keepAliveTime = unit.toMillis(timeout); + long delta = keepAliveTime - this->kernel->keepAliveTime; + this->kernel->keepAliveTime = keepAliveTime; + if (delta < 0) { + // TODO - When Threads are interruptible wake works so they can check timeout. + } +} + +//////////////////////////////////////////////////////////////////////////////// +bool ThreadPoolExecutor::allowsCoreThreadTimeout() const { + return this->kernel->coreThreadsCanTimeout; +} + +//////////////////////////////////////////////////////////////////////////////// +bool ThreadPoolExecutor::prestartCoreThread() { + return this->kernel->addWorker(); +} + +//////////////////////////////////////////////////////////////////////////////// +int ThreadPoolExecutor::prestartAllCoreThreads() { + return this->kernel->addAllWorkers(); +} + +//////////////////////////////////////////////////////////////////////////////// +bool ThreadPoolExecutor::remove(decaf::lang::Runnable* task) { + bool removed = this->kernel->workQueue->remove(task); + this->kernel->tryTerminate(); + return removed; +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::purge() { +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::beforeExecute(Thread* thread DECAF_UNUSED, Runnable* task DECAF_UNUSED) { +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutor::afterExecute(Runnable* task DECAF_UNUSED, decaf::lang::Throwable* error DECAF_UNUSED) { +} + +//////////////////////////////////////////////////////////////////////////////// void ThreadPoolExecutor::terminated() { } //////////////////////////////////////////////////////////////////////////////// ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize, int maxPoolSize, long long keepAliveTime, - BlockingQueue* workQueue) : + BlockingQueue* workQueue, + ThreadFactory* threadFactory, RejectedExecutionHandler* handler) : parent(parent), workers(), deadWorkers(), @@ -353,11 +666,14 @@ ExecutorKernel::ExecutorKernel(ThreadPoo maxPoolSize(maxPoolSize), corePoolSize(corePoolSize), keepAliveTime(keepAliveTime), - workQueue(workQueue), + coreThreadsCanTimeout(false), + workQueue(), mainLock(), termination(1), completedTasks(0), - largestPoolSize(0) { + largestPoolSize(0), + factory(), + rejectionHandler() { if(corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize || keepAliveTime < 0) { @@ -365,12 +681,16 @@ ExecutorKernel::ExecutorKernel(ThreadPoo throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range."); } - if(workQueue == NULL) { - throw NullPointerException(__FILE__, __LINE__, "BlockingQueue pointer was null"); + if(workQueue == NULL || threadFactory == NULL || handler == NULL) { + throw NullPointerException(__FILE__, __LINE__, "Required parameter was NULL"); } this->cleanupTimer.scheduleAtFixedRate( new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10)); + + this->workQueue.reset(workQueue); + this->factory.reset(threadFactory); + this->rejectionHandler.reset(handler); } //////////////////////////////////////////////////////////////////////////////// @@ -414,7 +734,7 @@ void ExecutorKernel::onTaskStarted(Worke // cause the number of Task to exceed the number of free threads // once the Threads got a chance to wake up and service the queue if( freeThreads.get() == 0 && !workQueue->isEmpty() ) { - AllocateThread(); + addWorker(); } } } @@ -470,13 +790,13 @@ void ExecutorKernel::enQueueTask(Runnabl // If there's nobody open to do work, then create some more // threads to handle the work. if( this->freeThreads.get() == 0 ) { - AllocateThread(); + addWorker(); } - } - // queue the new work. - if(!this->workQueue->offer(task)) { - throw RejectedExecutionException(__FILE__, __LINE__, "Task Rejected by work Q"); + // queue the new work. + if(isStoppedOrStopping() || !this->workQueue->offer(task)) { + this->rejectionHandler->rejectedExecution(task, this->parent); + } } } DECAF_CATCH_RETHROW( Exception ) @@ -490,24 +810,19 @@ Runnable* ExecutorKernel::deQueueTask() Runnable* task = NULL; - // Wait for work, wait in a while loop since another thread could - // be waiting for a lock and get the work before we get woken up - // from our wait. - while( !isStoppedOrStopping() ) { + while(true) { - // TODO - Threads aren't interruptible yet. + // TODO - Threads aren't interruptible yet, so spin wait. if(workQueue->poll(task, 10, TimeUnit::MILLISECONDS)) { break; } - } - - // Don't give more work if we are closing down - if(isStoppedOrStopping()) { - if(task != NULL) { - delete task; + if(isStoppedOrStopping() && workQueue->isEmpty()) { + break; } + } + if(isStoppedOrStopping() && task == NULL) { return NULL; } @@ -523,12 +838,12 @@ Runnable* ExecutorKernel::deQueueTask() } //////////////////////////////////////////////////////////////////////////////// -void ExecutorKernel::AllocateThread() { +bool ExecutorKernel::addWorker() { try{ if( this->workers.size() >= this->maxPoolSize ) { - return; + return false; } synchronized( &mainLock ) { @@ -538,6 +853,38 @@ void ExecutorKernel::AllocateThread() { newWorker->start(); this->largestPoolSize++; } + + return true; + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +int ExecutorKernel::addAllWorkers() { + + try{ + + if( this->workers.size() >= this->maxPoolSize ) { + return 0; + } + + int delta = 0; + + synchronized( &mainLock ) { + + delta = this->maxPoolSize - this->workers.size(); + + for(int i = 0; i < delta; ++i) { + Worker* newWorker = new Worker(this); + this->workers.add(newWorker); + freeThreads.incrementAndGet(); + newWorker->start(); + this->largestPoolSize++; + } + } + + return delta; } DECAF_CATCH_RETHROW( lang::Exception ) DECAF_CATCHALL_THROW( lang::Exception ) @@ -563,6 +910,29 @@ void ExecutorKernel::shutdown() { synchronized(&mainLock) { + // TODO - When threads are interruptible, we need to interrupt the Queue. + //synchronized( workQueue.get() ) { + // // Signal the Queue so that all waiters are notified + // workQueue->notifyAll(); + //} + } + + this->tryTerminate(); + this->stopped.set(true); + } +} + +//////////////////////////////////////////////////////////////////////////////// +void ExecutorKernel::shutdownNow(ArrayList& unexecutedTasks) { + + if(isStoppedOrStopping()) { + return; + } + + if(this->stopping.compareAndSet(false, true)) { + + synchronized(&mainLock) { + Pointer< Iterator > iter(this->workers.iterator()); while(iter->hasNext()) { @@ -574,13 +944,38 @@ void ExecutorKernel::shutdown() { // // Signal the Queue so that all waiters are notified // workQueue->notifyAll(); //} + + this->drainQueue(unexecutedTasks); } + this->tryTerminate(); this->stopped.set(true); } } //////////////////////////////////////////////////////////////////////////////// +void ExecutorKernel::drainQueue(ArrayList& unexecutedTasks) { + + // Some Queue implementations can fail in poll and drainTo so we check + // after attempting to drain the Queue and if its not empty we remove + // the tasks one by one. + + this->workQueue->drainTo(unexecutedTasks); + if (!this->workQueue->isEmpty()) { + + std::vector tasks = this->workQueue->toArray(); + std::vector::iterator iter = tasks.begin(); + + for (; iter != tasks.end(); ++iter) { + + if (this->workQueue->remove(*iter)) { + unexecutedTasks.add(*iter); + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// bool ExecutorKernel::awaitTermination(long long timeout, const TimeUnit& unit) { if (this->terminated.get() == true) {