Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 7200 invoked from network); 1 Jun 2007 19:20:35 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 1 Jun 2007 19:20:35 -0000 Received: (qmail 22491 invoked by uid 500); 1 Jun 2007 19:20:39 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 22439 invoked by uid 500); 1 Jun 2007 19:20:39 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 22423 invoked by uid 99); 1 Jun 2007 19:20:39 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2007 12:20:39 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2007 12:20:33 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 47EA91A981A; Fri, 1 Jun 2007 12:20:13 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r543581 - in /activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util: concurrent/ logging/ Date: Fri, 01 Jun 2007 19:20:12 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070601192013.47EA91A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Fri Jun 1 12:20:11 2007 New Revision: 543581 URL: http://svn.apache.org/viewvc?view=rev&rev=543581 Log: http://issues.apache.org/activemq/browse/AMQCPP-103 Building Decaf lib Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp?view=auto&rev=543581 ============================================================================== --- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp (added) +++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp Fri Jun 1 12:20:11 2007 @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include + +using namespace decaf; +using namespace decaf::lang; +using namespace decaf::lang::exceptions; +using namespace decaf::util; +using namespace decaf::util::concurrent; + +//////////////////////////////////////////////////////////////////////////////// +LOGCMS_INITIALIZE(logger, PooledThread, "com.activemq.concurrent.PooledThread") + +//////////////////////////////////////////////////////////////////////////////// +PooledThread::PooledThread(ThreadPool* pool) +{ + if(pool == NULL) + { + throw IllegalArgumentException( __FILE__, __LINE__, + "PooledThread::PooledThread"); + } + + busy = false; + done = false; + + listener = NULL; + + // Store our Pool. + this->pool = pool; +} + +//////////////////////////////////////////////////////////////////////////////// +PooledThread::~PooledThread() +{ +} + +//////////////////////////////////////////////////////////////////////////////// +void PooledThread::run(void) +{ + ThreadPool::Task task; + + try + { + while(!done) + { + //LOGCMS_DEBUG(logger, "PooledThread::run - Entering deQ"); + + // Blocks until there something to be done + task = pool->deQueueTask(); + + //LOGCMS_DEBUG(logger, "PooledThread::run - Exited deQ"); + + // Check if the Done Flag is set, in case it happened while we + // were waiting for a task + if(done) + { + break; + } + + // If we got here and the runnable was null then something + // bad must have happened. Throw an Exception and bail. + if(!task.first) + { + throw Exception( __FILE__, __LINE__, + "PooledThread::run - Retrieive NULL task from Pool."); + } + + // Got some work to do, so set flag to busy + busy = true; + + // Inform a listener that we are going to start + if(listener) + { + /*LOGCMS_DEBUG(logger, + "PooledThread::run - Inform Listener we are starting");*/ + listener->onTaskStarted(this); + } + + // Perform the work + task.first->run(); + + /*LOGCMS_DEBUG(logger, + "PooledThread::run - Inform Task Listener we are done");*/ + + // Notify the Task listener that we are done + task.second->onTaskComplete(task.first); + + // Inform a listener that we are going to stop and wait + // for a new task + if(listener) + { + /*LOGCMS_DEBUG(logger, + "PooledThread::run - Inform Listener we are done");*/ + listener->onTaskCompleted(this); + } + + // Set flag to inactive, we will wait for work + busy = false; + } + } + catch( Exception& ex ) + { + ex.setMark( __FILE__, __LINE__ ); + + // Notify the Task owner + if(task.first && task.second) + { + task.second->onTaskException(task.first, ex); + } + + busy = false; + + // Notify the PooledThreadListener + if(listener) + { + listener->onTaskException(this, ex); + } + } + catch(...) + { + Exception ex( + __FILE__, __LINE__, + "PooledThread::run - Caught Unknown Exception"); + + // Notify the Task owner + if(task.first && task.second) + { + task.second->onTaskException(task.first, ex); + } + + busy = false; + + // Notify the PooledThreadListener + if(listener) + { + listener->onTaskException(this, ex); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +void PooledThread::stop(void) throw ( Exception ) +{ + done = true; +} Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h?view=auto&rev=543581 ============================================================================== --- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h (added) +++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h Fri Jun 1 12:20:11 2007 @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _DECAF_UTIL_CONCURRENT_POOLEDTHREAD_H_ +#define _DECAF_UTIL_CONCURRENT_POOLEDTHREAD_H_ + +#include +#include +#include + +#include + +namespace decaf{ +namespace util{ +namespace concurrent{ + + class ThreadPool; + + class PooledThread : public lang::Thread + { + private: + + // Is this thread currently processing something + bool busy; + + // Boolean flag indicating thread should stop + bool done; + + // Listener for Task related events + PooledThreadListener* listener; + + // The thread pool this Pooled Thread is Servicing + ThreadPool* pool; + + // Logger Init + LOGCMS_DECLARE(logger) + + public: + + /** + * Constructor + * @param pool the parant ThreadPool object + */ + PooledThread( ThreadPool* pool ); + + virtual ~PooledThread(); + + /** + * Run Method for this object waits for something to be + * enqueued on the ThreadPool and then grabs it and calls + * its run method. + */ + virtual void run(); + + /** + * Stops the Thread, thread will complete its task if currently + * running one, and then die. Does not block. + * @throws Exception + */ + virtual void stop() throw ( lang::Exception ); + + /** + * Checks to see if the thread is busy, if busy it means + * that this thread has taken a task from the ThreadPool's + * queue and is processing it. + * @returns true if the Thread is busy + */ + virtual bool isBusy() { return busy; } + + /** + * Adds a listener to this PooledThread to be + * notified when this thread starts and completes a task. + * @param listener the listener to send notifications to. + */ + virtual void setPooledThreadListener( PooledThreadListener* listener ) + { + this->listener = listener; + } + + /** + * Removes a listener for this PooledThread to be + * notified when this thread starts and completes a task. + * @return a pointer to this thread's listener or NULL + */ + virtual PooledThreadListener* getPooledThreadListener() + { + return this->listener; + } + }; + +}}} + +#endif /*_DECAF_UTIL_CONCURRENT_POOLEDTHREAD_H_*/ Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h?view=auto&rev=543581 ============================================================================== --- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h (added) +++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h Fri Jun 1 12:20:11 2007 @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _DECAF_UTIL_CONCURRENT_POOLEDTHREADLISTENER_H_ +#define _DECAF_UTIL_CONCURRENT_POOLEDTHREADLISTENER_H_ + +#include + +namespace decaf{ +namespace util{ +namespace concurrent{ + + class PooledThread; + + class PooledThreadListener + { + public: + + /** + * Destructor + */ + virtual ~PooledThreadListener() {} + + /** + * Called by a pooled thread when it is about to begin + * executing a new task. + * @param Pointer to the Pooled Thread that is making this call + */ + virtual void onTaskStarted( PooledThread* thread ) = 0; + + /** + * Called by a pooled thread when it has completed a task + * and is going back to waiting for another task to run + * @param Pointer the the Pooled Thread that is making this call. + */ + virtual void onTaskCompleted( PooledThread* thread ) = 0; + + /** + * Called by a pooled thread when it has encountered an exception + * while running a user task, after receiving this notification + * the callee should assume that the PooledThread is now no longer + * running. + * @param Pointer to the Pooled Thread that is making this call + * @param The Exception that occured. + */ + virtual void onTaskException( PooledThread* thread, + lang::Exception& ex) = 0; + + }; + +}}} + +#endif /*_DECAF_UTIL_CONCURRENT_POOLEDTHREADLISTENER_H_*/ Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h?view=auto&rev=543581 ============================================================================== --- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h (added) +++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h Fri Jun 1 12:20:11 2007 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _DECAF_UTIL_CONCURRENT_TASKLISTENER_H_ +#define _DECAF_UTIL_CONCURRENT_TASKLISTENER_H_ + +#include +#include + +namespace decaf{ +namespace util{ +namespace concurrent{ + + class TaskListener + { + public: + + virtual ~TaskListener() {} + + /** + * Called when a queued task has completed, the task that + * finished is passed along for user consumption + * @param task Runnable Pointer to the task that finished + */ + virtual void onTaskComplete( lang::Runnable* task ) = 0; + + /** + * Called when a queued task has thrown an exception while + * being run. The Callee should assume that this was an + * unrecoverable exeption and that this task is now defunct. + * @param task Runnable Pointer to the task + * @param ex The ActiveMQException that was thrown. + */ + virtual void onTaskException( lang::Runnable* task, + lang::Exception& ex ) = 0; + + }; + +}}} + +#endif /*_DECAF_UTIL_CONCURRENT_TASKLISTENER_H_*/ Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp?view=auto&rev=543581 ============================================================================== --- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp (added) +++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp Fri Jun 1 12:20:11 2007 @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#ifdef min +#undef min +#endif + +#include +#include + +using namespace std; +using namespace decaf; +using namespace decaf::lang; +using namespace decaf::lang::exceptions; +using namespace decaf::util; +using namespace decaf::util::concurrent; + +//////////////////////////////////////////////////////////////////////////////// +LOGCMS_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool") +LOGCMS_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker") + +//////////////////////////////////////////////////////////////////////////////// +ThreadPool ThreadPool::instance; + +//////////////////////////////////////////////////////////////////////////////// +ThreadPool::ThreadPool() +{ + maxThreads = DEFAULT_MAX_POOL_SIZE; + blockSize = DEFAULT_MAX_BLOCK_SIZE; + freeThreads = 0; + + shutdown = false; +} + +//////////////////////////////////////////////////////////////////////////////// +ThreadPool::~ThreadPool() +{ + try + { + std::vector::iterator itr = pool.begin(); + + // Stop all the threads + for(; itr != pool.end(); ++itr) + { + (*itr)->stop(); + } + + // Set the shutdown flag so that the DeQueue methods all quit + // when we interrupt them. + shutdown = true; + + synchronized(&queue) + { + // Signal the Queue so that all waiters are notified + queue.notifyAll(); + } + + // Wait for everyone to die + for(itr = pool.begin(); itr != pool.end(); ++itr) + { + (*itr)->join(); + + // Destroy the threads + delete *itr; + } + + pool.clear(); + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::queueTask( ThreadPool::Task task ) + throw ( lang::Exception ) +{ + try + { + if(!task.first || !task.second) + { + throw exceptions::IllegalArgumentException( __FILE__, __LINE__, + "ThreadPool::QueueTask - Invalid args for Task"); + } + + //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue"); + + synchronized(&queue) + { + //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool"); + + // If there's nobody open to do work, then create some more + // threads to handle the work. + if(freeThreads == 0) + { + AllocateThreads(blockSize); + } + + //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task"); + + // queue the new work. + queue.push(task); + + //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify"); + + // Inform waiters that we put some work on the queue. + queue.notify(); + } + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +ThreadPool::Task ThreadPool::deQueueTask() + throw ( lang::Exception ) +{ + try + { + //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue"); + + synchronized(&queue) + { + /*LOGCMS_DEBUG(logger, + "ThreadPool::DeQueueTask - sync'd checking queue empty");*/ + + // Wait for work, wait in a while loop since another thread could + // be waiting for a lock and get the work before we get woken up + // from our wait. + while(queue.empty() && !shutdown) + { + //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting"); + + queue.wait(); + + //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting"); + } + + // Don't give more work if we are closing down + if(shutdown) + { + return Task(); + } + + // check size again. + if(queue.empty()) + { + throw lang::Exception( __FILE__, __LINE__, + "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown."); + } + + //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task"); + + // not empty so get the new work to do + return queue.pop(); + } + + return Task(); + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::reserve( std::size_t size ) +{ + try + { + synchronized(&poolLock) + { + if(size < pool.size() || pool.size() == maxThreads) + { + return; + } + + // How many do we reserve + std::size_t allocCount = size - pool.size(); + + // Allocate the new Threads + AllocateThreads(allocCount); + } + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::setMaxThreads( std::size_t maxThreads ) +{ + try + { + synchronized(&poolLock) + { + if(maxThreads == 0) + { + // Caller tried to do something stupid, ignore them. + return; + } + + this->maxThreads = maxThreads; + } + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::setBlockSize( std::size_t blockSize ) +{ + try + { + if(blockSize <= 0) + { + // User tried something dumb, protect them from themselves + return; + } + + synchronized(&poolLock) + { + this->blockSize = blockSize; + } + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::AllocateThreads( std::size_t count ) +{ + try + { + if(pool.size() >= maxThreads) + { + return; + } + + synchronized(&poolLock) + { + // Take the min of alloc size of maxThreads since we don't + // want anybody sneaking eaxtra threads in, greedy bastards. + count = std::min(count, maxThreads - pool.size()); + + // Each time we create a thread we increment the free Threads + // counter, but before we call start so that the Thread doesn't + // get ahead of us. + for(std::size_t i = 0; i < count; ++i) + { + pool.push_back(new PooledThread(this)); + pool.back()->setPooledThreadListener(this); + freeThreads++; + pool.back()->start(); + } + } + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::onTaskStarted( PooledThread* thread DECAF_UNUSED ) +{ + try + { + synchronized(&poolLock) + { + freeThreads--; + + // Now that this callback has decremented the free threads coutner + // let check if there is any outstanding work to be done and no + // threads to handle it. This could happen if the QueueTask + // method was called successively without any of the PooledThreads + // having a chance to wake up and service the queue. This would + // cause the number of Task to exceed the number of free threads + // once the Threads got a chance to wake up and service the queue + if( freeThreads == 0 && !queue.empty() ) + { + // Allocate a new block of threads + AllocateThreads( blockSize ); + } + } + + //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:"); + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::onTaskCompleted( PooledThread* thread DECAF_UNUSED) +{ + try + { + synchronized(&poolLock) + { + freeThreads++; + } + + //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: "); + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ThreadPool::onTaskException( + PooledThread* thread, + lang::Exception& ex DECAF_UNUSED ) +{ + //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: "); + + try + { + synchronized(&poolLock) + { + // Delete the thread that had the exception and start a new + // one to take its place. + freeThreads--; + + std::vector::iterator itr = + std::find(pool.begin(), pool.end(), thread); + + if(itr != pool.end()) + { + pool.erase(itr); + } + + // Bye-Bye Thread Object + delete thread; + + // Now allocate a replacement + AllocateThreads(1); + } + } + DECAF_CATCH_RETHROW( lang::Exception ) + DECAF_CATCHALL_THROW( lang::Exception ) +} + Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h?view=auto&rev=543581 ============================================================================== --- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h (added) +++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h Fri Jun 1 12:20:11 2007 @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _DECAF_UTIL_CONCURRENT_THREADPOOL_H_ +#define _DECAF_UTIL_CONCURRENT_THREADPOOL_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace decaf{ +namespace util{ +namespace concurrent{ + + /** + * Defines a Thread Pool object that implements the functionality + * of pooling threads to perform user tasks. The Thread Poll has + * max size that it will grow to. The thread pool allocates threads + * in blocks. When there are no waiting worker threads and a task + * is queued then a new batch is allocated. The user can specify + * the size of the blocks, otherwise a default value is used. + *

+ * When the user queues a task they must also queue a listner to + * be notified when the task has completed, this provides the user + * with a mechanism to know when a task object can be freed. + *

+ * To have the Thread Pool perform a task, the user enqueue's an + * object that implements the Runnable insterface and + * one of the worker threads will executing it in its thread context. + */ + class ThreadPool : public PooledThreadListener + { + public: + + // Constants + static const size_t DEFAULT_MAX_POOL_SIZE = 10; + static const size_t DEFAULT_MAX_BLOCK_SIZE = 3; + + // Types + typedef std::pair Task; + + private: + + // Vector of threads that this object has created for its pool. + std::vector< PooledThread* > pool; + + // Queue of Task that are in need of completion + util::Queue queue; + + // Max number of Threads this Pool can contian + std::size_t maxThreads; + + // Max number of tasks that can be allocated at a time + std::size_t blockSize; + + // boolean flag use to indocate that this object is shutting down. + bool shutdown; + + // Count of threads that are currently free to perfom some work. + std::size_t freeThreads; + + // Mutex for locking operations that affect the pool. + Mutex poolLock; + + // Logger Init + LOGCMS_DECLARE(logger) + LOGCMS_DECLARE(marker) + + private: // Statics + + // The singleton instance of this class + static ThreadPool instance; + + public: + + ThreadPool(); + virtual ~ThreadPool(); + + /** + * Queue a task to be completed by one of the Pooled Threads. + * tasks are serviced as soon as a PooledThread + * is available to run it. + * @param task object that derives from Runnable + * @throws ActiveMQException + */ + virtual void queueTask( Task task ) + throw ( lang::Exception ); + + /** + * DeQueue a task to be completed by one of the Pooled Threads. + * A caller of this method will block until there is something + * in the tasks queue, therefore care must be taken when calling + * this function. Normally clients of ThreadPool don't use + * this, only the PooledThread objects owned by + * this ThreadPool. + * @return object that derives from Runnable + * @throws ActiveMQException + */ + virtual Task deQueueTask() + throw ( lang::Exception ); + + /** + * Returns the current number of Threads in the Pool, this is + * how many there are now, not how many are active or the max + * number that might exist. + * @return integer number of threads in existance. + */ + virtual std::size_t getPoolSize() const { return pool.size(); } + + /** + * Returns the current backlog of items in the tasks queue, this + * is how much work is still waiting to get done. + * @return number of outstanding tasks. + */ + virtual std::size_t getBacklog() const { return queue.size(); } + + /** + * Ensures that there is at least the specified number of Threads + * allocated to the pool. If the size is greater than the MAX + * number of threads in the pool, then only MAX threads are + * reservved. If the size is smaller than the number of threads + * currently in the pool, than nothing is done. + * @param size the number of threads to reserve. + */ + virtual void reserve( std::size_t size ); + + /** + * Get the Max Number of Threads this Pool can contain + * @return max size + */ + virtual std::size_t getMaxThreads() const { return maxThreads; } + + /** + * Sets the Max number of threads this pool can contian. + * if this value is smaller than the current size of the + * pool nothing is done. + * @param maxThreads total number of threads that can be pooled + */ + virtual void setMaxThreads( std::size_t maxThreads ); + + /** + * Gets the Max number of threads that can be allocated at a time + * when new threads are needed. + * @return max Thread Block Size + */ + virtual std::size_t getBlockSize() const { return blockSize; } + + /** + * Sets the Max number of Threads that can be allocated at a time + * when the Thread Pool determines that more Threads are needed. + * @param blockSize Max Thread Block Size + */ + virtual void setBlockSize( std::size_t blockSize ); + + /** + * Returns the current number of available threads in the pool, threads + * that are performing a user task are considered unavailable. This value + * could change immeadiately after calling as Threads could finish right + * after and be available again. This is informational only. + * @return totoal free threads + */ + virtual std::size_t getFreeThreadCount() const { + return freeThreads; + } + + public: // PooledThreadListener Callbacks + + /** + * Called by a pooled thread when it is about to begin + * executing a new task. This will decrement the available + * threads counter so that this object knows when there are + * no more free threads and must create new ones. + * @param thread Pointer to the Pooled Thread that is making this call + */ + virtual void onTaskStarted( PooledThread* thread ); + + /** + * Called by a pooled thread when it has completed a task + * and is going back to waiting for another task to run, + * this will increment the free threads counter. + * @param thread Pointer the the Pooled Thread that is making this call. + */ + virtual void onTaskCompleted( PooledThread* thread ); + + /** + * Called by a pooled thread when it has encountered an exception + * while running a user task, after receiving this notification + * the callee should assume that the PooledThread is now no longer + * running. + * @param thread Pointer to the Pooled Thread that is making this call + * @param ex The Exception that occured. + */ + virtual void onTaskException( PooledThread* thread, + lang::Exception& ex ); + + public: // Statics + + /** + * Return the one and only Thread Pool instance. + * @return The Thread Pool Pointer + */ + static ThreadPool* getInstance() { + return &instance; + } + + private: + + /** + * Allocates the requested ammount of Threads, won't exceed + * maxThreads. + * @param count the number of threads to create + */ + void AllocateThreads( std::size_t count ); + + }; + +}}} + +#endif /*_DECAF_UTIL_CONCURRENT_THREADPOOL_H_*/ Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h?view=diff&rev=543581&r1=543580&r2=543581 ============================================================================== --- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h (original) +++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h Fri Jun 1 12:20:11 2007 @@ -21,13 +21,13 @@ #include #define LOGDECAF_DECLARE(loggerName) \ - static decaf::logger::SimpleLogger loggerName; + static decaf::util::logging::SimpleLogger loggerName; #define LOGDECAF_INITIALIZE(loggerName, className, loggerFamily) \ - decaf::logger::SimpleLogger className::loggerName(loggerFamily); + decaf::util::logging::SimpleLogger className::loggerName(loggerFamily); #define LOGDECAF_DECLARE_LOCAL(loggerName) \ - decaf::logger::Logger loggerName; + decaf::util::logging::Logger loggerName; #define LOGDECAF_DEBUG(logger, message) \ logger.debug(__FILE__, __LINE__, message);