activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r543581 - in /activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util: concurrent/ logging/
Date Fri, 01 Jun 2007 19:20:12 GMT
Author: tabish
Date: Fri Jun  1 12:20:11 2007
New Revision: 543581

URL: http://svn.apache.org/viewvc?view=rev&rev=543581
Log:
http://issues.apache.org/activemq/browse/AMQCPP-103

Building Decaf lib

Added:
    activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp
    activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h
    activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h
    activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h
    activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp
    activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h
Modified:
    activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h

Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp?view=auto&rev=543581
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp
(added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp
Fri Jun  1 12:20:11 2007
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <decaf/util/concurrent/PooledThread.h>
+#include <decaf/util/concurrent/ThreadPool.h>
+#include <decaf/util/concurrent/TaskListener.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+
+#include <iostream>
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+LOGCMS_INITIALIZE(logger, PooledThread, "com.activemq.concurrent.PooledThread")
+
+////////////////////////////////////////////////////////////////////////////////
+PooledThread::PooledThread(ThreadPool* pool)
+{
+    if(pool == NULL)
+    {
+        throw IllegalArgumentException( __FILE__, __LINE__,
+            "PooledThread::PooledThread");
+    }
+
+    busy = false;
+    done = false;
+
+    listener = NULL;
+
+    // Store our Pool.
+    this->pool = pool;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+PooledThread::~PooledThread()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PooledThread::run(void)
+{
+    ThreadPool::Task task;
+
+    try
+    {
+        while(!done)
+        {
+            //LOGCMS_DEBUG(logger, "PooledThread::run - Entering deQ");
+
+            // Blocks until there something to be done
+            task = pool->deQueueTask();
+
+            //LOGCMS_DEBUG(logger, "PooledThread::run - Exited deQ");
+
+            // Check if the Done Flag is set, in case it happened while we
+            // were waiting for a task
+            if(done)
+            {
+                break;
+            }
+
+            // If we got here and the runnable was null then something
+            // bad must have happened.  Throw an Exception and bail.
+            if(!task.first)
+            {
+                throw Exception( __FILE__, __LINE__,
+                    "PooledThread::run - Retrieive NULL task from Pool.");
+            }
+
+            // Got some work to do, so set flag to busy
+            busy = true;
+
+            // Inform a listener that we are going to start
+            if(listener)
+            {
+                /*LOGCMS_DEBUG(logger,
+                   "PooledThread::run - Inform Listener we are starting");*/
+                listener->onTaskStarted(this);
+            }
+
+            // Perform the work
+            task.first->run();
+
+            /*LOGCMS_DEBUG(logger,
+                "PooledThread::run - Inform Task Listener we are done");*/
+
+            // Notify the Task listener that we are done
+            task.second->onTaskComplete(task.first);
+
+            // Inform a listener that we are going to stop and wait
+            // for a new task
+            if(listener)
+            {
+                /*LOGCMS_DEBUG(logger,
+                    "PooledThread::run - Inform Listener we are done");*/
+                listener->onTaskCompleted(this);
+            }
+
+            // Set flag to inactive, we will wait for work
+            busy = false;
+        }
+    }
+    catch( Exception& ex )
+    {
+        ex.setMark( __FILE__, __LINE__ );
+
+        // Notify the Task owner
+        if(task.first && task.second)
+        {
+            task.second->onTaskException(task.first, ex);
+        }
+
+        busy = false;
+
+        // Notify the PooledThreadListener
+        if(listener)
+        {
+            listener->onTaskException(this, ex);
+        }
+    }
+    catch(...)
+    {
+        Exception ex(
+            __FILE__, __LINE__,
+            "PooledThread::run - Caught Unknown Exception");
+
+        // Notify the Task owner
+        if(task.first && task.second)
+        {
+            task.second->onTaskException(task.first, ex);
+        }
+
+        busy = false;
+
+        // Notify the PooledThreadListener
+        if(listener)
+        {
+            listener->onTaskException(this, ex);
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PooledThread::stop(void) throw ( Exception )
+{
+    done = true;
+}

Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h?view=auto&rev=543581
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h Fri
Jun  1 12:20:11 2007
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _DECAF_UTIL_CONCURRENT_POOLEDTHREAD_H_
+#define _DECAF_UTIL_CONCURRENT_POOLEDTHREAD_H_
+
+#include <decaf/lang/Thread.h>
+#include <decaf/util/concurrent/PooledThreadListener.h>
+#include <decaf/util/logging/LoggerDefines.h>
+
+#include <decaf/lang/Exception.h>
+
+namespace decaf{
+namespace util{
+namespace concurrent{
+
+    class ThreadPool;
+
+    class PooledThread : public lang::Thread
+    {
+    private:
+
+        // Is this thread currently processing something
+        bool busy;
+
+        // Boolean flag indicating thread should stop
+        bool done;
+
+        // Listener for Task related events
+        PooledThreadListener* listener;
+
+        // The thread pool this Pooled Thread is Servicing
+        ThreadPool* pool;
+
+        // Logger Init
+        LOGCMS_DECLARE(logger)
+
+     public:
+
+        /**
+         * Constructor
+         * @param pool the parant ThreadPool object
+         */
+        PooledThread( ThreadPool* pool );
+
+        virtual ~PooledThread();
+
+        /**
+         * Run Method for this object waits for something to be
+         * enqueued on the ThreadPool and then grabs it and calls
+         * its run method.
+         */
+        virtual void run();
+
+        /**
+         * Stops the Thread, thread will complete its task if currently
+         * running one, and then die.  Does not block.
+         * @throws Exception
+         */
+        virtual void stop() throw ( lang::Exception );
+
+        /**
+         * Checks to see if the thread is busy, if busy it means
+         * that this thread has taken a task from the ThreadPool's
+         * queue and is processing it.
+         * @returns true if the Thread is busy
+         */
+        virtual bool isBusy() { return busy; }
+
+        /**
+         * Adds a listener to this <code>PooledThread</code> to be
+         * notified when this thread starts and completes a task.
+         * @param listener the listener to send notifications to.
+         */
+        virtual void setPooledThreadListener( PooledThreadListener* listener )
+        {
+            this->listener = listener;
+        }
+
+        /**
+         * Removes a listener for this <code>PooledThread</code> to be
+         * notified when this thread starts and completes a task.
+         * @return a pointer to this thread's listener or NULL
+         */
+        virtual PooledThreadListener* getPooledThreadListener()
+        {
+            return this->listener;
+        }
+    };
+
+}}}
+
+#endif /*_DECAF_UTIL_CONCURRENT_POOLEDTHREAD_H_*/

Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h?view=auto&rev=543581
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h
(added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h
Fri Jun  1 12:20:11 2007
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _DECAF_UTIL_CONCURRENT_POOLEDTHREADLISTENER_H_
+#define _DECAF_UTIL_CONCURRENT_POOLEDTHREADLISTENER_H_
+
+#include <decaf/lang/Exception.h>
+
+namespace decaf{
+namespace util{
+namespace concurrent{
+
+    class PooledThread;
+
+    class PooledThreadListener
+    {
+    public:
+
+        /**
+         * Destructor
+         */
+        virtual ~PooledThreadListener() {}
+
+        /**
+         * Called by a pooled thread when it is about to begin
+         * executing a new task.
+         * @param Pointer to the Pooled Thread that is making this call
+         */
+        virtual void onTaskStarted( PooledThread* thread ) = 0;
+
+        /**
+         * Called by a pooled thread when it has completed a task
+         * and is going back to waiting for another task to run
+         * @param Pointer the the Pooled Thread that is making this call.
+         */
+        virtual void onTaskCompleted( PooledThread* thread ) = 0;
+
+        /**
+         * Called by a pooled thread when it has encountered an exception
+         * while running a user task, after receiving this notification
+         * the callee should assume that the PooledThread is now no longer
+         * running.
+         * @param Pointer to the Pooled Thread that is making this call
+         * @param The Exception that occured.
+         */
+        virtual void onTaskException( PooledThread* thread,
+                                      lang::Exception& ex) = 0;
+
+    };
+
+}}}
+
+#endif /*_DECAF_UTIL_CONCURRENT_POOLEDTHREADLISTENER_H_*/

Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h?view=auto&rev=543581
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h Fri
Jun  1 12:20:11 2007
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _DECAF_UTIL_CONCURRENT_TASKLISTENER_H_
+#define _DECAF_UTIL_CONCURRENT_TASKLISTENER_H_
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/lang/Exception.h>
+
+namespace decaf{
+namespace util{
+namespace concurrent{
+
+    class TaskListener
+    {
+    public:
+
+        virtual ~TaskListener() {}
+
+        /**
+         * Called when a queued task has completed, the task that
+         * finished is passed along for user consumption
+         * @param task Runnable Pointer to the task that finished
+         */
+        virtual void onTaskComplete( lang::Runnable* task ) = 0;
+
+        /**
+         * Called when a queued task has thrown an exception while
+         * being run.  The Callee should assume that this was an
+         * unrecoverable exeption and that this task is now defunct.
+         * @param task Runnable Pointer to the task
+         * @param ex The ActiveMQException that was thrown.
+         */
+        virtual void onTaskException( lang::Runnable* task,
+                                      lang::Exception& ex ) = 0;
+
+    };
+
+}}}
+
+#endif /*_DECAF_UTIL_CONCURRENT_TASKLISTENER_H_*/

Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp?view=auto&rev=543581
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp Fri
Jun  1 12:20:11 2007
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <decaf/util/concurrent/ThreadPool.h>
+#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/util/Config.h>
+
+#ifdef min
+#undef min
+#endif
+
+#include <algorithm>
+#include <iostream>
+
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+LOGCMS_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool")
+LOGCMS_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker")
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool ThreadPool::instance;
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::ThreadPool()
+{
+    maxThreads  = DEFAULT_MAX_POOL_SIZE;
+    blockSize   = DEFAULT_MAX_BLOCK_SIZE;
+    freeThreads = 0;
+
+    shutdown = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::~ThreadPool()
+{
+    try
+    {
+        std::vector<PooledThread*>::iterator itr = pool.begin();
+
+        // Stop all the threads
+        for(; itr != pool.end(); ++itr)
+        {
+            (*itr)->stop();
+        }
+
+        // Set the shutdown flag so that the DeQueue methods all quit
+        // when we interrupt them.
+        shutdown = true;
+
+        synchronized(&queue)
+        {
+            // Signal the Queue so that all waiters are notified
+            queue.notifyAll();
+        }
+
+        // Wait for everyone to die
+        for(itr = pool.begin(); itr != pool.end(); ++itr)
+        {
+            (*itr)->join();
+
+            // Destroy the threads
+            delete *itr;
+        }
+
+        pool.clear();
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::queueTask( ThreadPool::Task task )
+   throw ( lang::Exception )
+{
+    try
+    {
+        if(!task.first || !task.second)
+        {
+            throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
+                "ThreadPool::QueueTask - Invalid args for Task");
+        }
+
+        //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");
+
+        synchronized(&queue)
+        {
+            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");
+
+            // If there's nobody open to do work, then create some more
+            // threads to handle the work.
+            if(freeThreads == 0)
+            {
+                AllocateThreads(blockSize);
+            }
+
+            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
+
+            // queue the new work.
+            queue.push(task);
+
+            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
+
+            // Inform waiters that we put some work on the queue.
+            queue.notify();
+        }
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::Task ThreadPool::deQueueTask()
+   throw ( lang::Exception )
+{
+    try
+    {
+        //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");
+
+        synchronized(&queue)
+        {
+            /*LOGCMS_DEBUG(logger,
+                "ThreadPool::DeQueueTask - sync'd checking queue empty");*/
+
+           // Wait for work, wait in a while loop since another thread could
+           // be waiting for a lock and get the work before we get woken up
+           // from our wait.
+           while(queue.empty() && !shutdown)
+           {
+               //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
+
+               queue.wait();
+
+               //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
+           }
+
+           // Don't give more work if we are closing down
+           if(shutdown)
+           {
+               return Task();
+           }
+
+           // check size again.
+           if(queue.empty())
+           {
+               throw lang::Exception( __FILE__, __LINE__,
+                   "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
+           }
+
+           //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
+
+           // not empty so get the new work to do
+           return queue.pop();
+        }
+
+        return Task();
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::reserve( std::size_t size )
+{
+    try
+    {
+        synchronized(&poolLock)
+        {
+            if(size < pool.size() || pool.size() == maxThreads)
+            {
+                return;
+            }
+
+            // How many do we reserve
+            std::size_t allocCount = size - pool.size();
+
+            // Allocate the new Threads
+            AllocateThreads(allocCount);
+        }
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setMaxThreads( std::size_t maxThreads )
+{
+    try
+    {
+        synchronized(&poolLock)
+        {
+            if(maxThreads == 0)
+            {
+                // Caller tried to do something stupid, ignore them.
+                return;
+            }
+
+            this->maxThreads = maxThreads;
+        }
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setBlockSize( std::size_t blockSize )
+{
+    try
+    {
+        if(blockSize <= 0)
+        {
+            // User tried something dumb, protect them from themselves
+            return;
+        }
+
+        synchronized(&poolLock)
+        {
+            this->blockSize = blockSize;
+        }
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::AllocateThreads( std::size_t count )
+{
+    try
+    {
+        if(pool.size() >= maxThreads)
+        {
+            return;
+        }
+
+        synchronized(&poolLock)
+        {
+            // Take the min of alloc size of maxThreads since we don't
+            // want anybody sneaking eaxtra threads in, greedy bastards.
+            count = std::min(count, maxThreads - pool.size());
+
+            // Each time we create a thread we increment the free Threads
+            // counter, but before we call start so that the Thread doesn't
+            // get ahead of us.
+            for(std::size_t i = 0; i < count; ++i)
+            {
+                pool.push_back(new PooledThread(this));
+                pool.back()->setPooledThreadListener(this);
+                freeThreads++;
+                pool.back()->start();
+            }
+        }
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskStarted( PooledThread* thread DECAF_UNUSED )
+{
+    try
+    {
+        synchronized(&poolLock)
+        {
+            freeThreads--;
+
+            // Now that this callback has decremented the free threads coutner
+            // let check if there is any outstanding work to be done and no
+            // threads to handle it.  This could happen if the QueueTask
+            // method was called successively without any of the PooledThreads
+            // having a chance to wake up and service the queue.  This would
+            // cause the number of Task to exceed the number of free threads
+            // once the Threads got a chance to wake up and service the queue
+            if( freeThreads == 0 && !queue.empty() )
+            {
+                // Allocate a new block of threads
+                AllocateThreads( blockSize );
+            }
+        }
+
+        //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskCompleted( PooledThread* thread DECAF_UNUSED)
+{
+    try
+    {
+        synchronized(&poolLock)
+        {
+            freeThreads++;
+        }
+
+        //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskException(
+   PooledThread* thread,
+   lang::Exception& ex DECAF_UNUSED )
+{
+    //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");
+
+    try
+    {
+        synchronized(&poolLock)
+        {
+            // Delete the thread that had the exception and start a new
+            // one to take its place.
+            freeThreads--;
+
+            std::vector<PooledThread*>::iterator itr =
+                std::find(pool.begin(), pool.end(), thread);
+
+            if(itr != pool.end())
+            {
+                pool.erase(itr);
+            }
+
+            // Bye-Bye Thread Object
+            delete thread;
+
+            // Now allocate a replacement
+            AllocateThreads(1);
+        }
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
+

Added: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h?view=auto&rev=543581
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h Fri
Jun  1 12:20:11 2007
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _DECAF_UTIL_CONCURRENT_THREADPOOL_H_
+#define _DECAF_UTIL_CONCURRENT_THREADPOOL_H_
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/PooledThread.h>
+#include <decaf/util/concurrent/PooledThreadListener.h>
+#include <decaf/util/concurrent/TaskListener.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/Queue.h>
+#include <deacf/util/logging/LoggerDefines.h>
+
+#include <vector>
+
+namespace decaf{
+namespace util{
+namespace concurrent{
+
+    /**
+     * Defines a Thread Pool object that implements the functionality
+     * of pooling threads to perform user tasks.  The Thread Poll has
+     * max size that it will grow to.  The thread pool allocates threads
+     * in blocks.  When there are no waiting worker threads and a task
+     * is queued then a new batch is allocated.  The user can specify
+     * the size of the blocks, otherwise a default value is used.
+     * <P>
+     * When the user queues a task they must also queue a listner to
+     * be notified when the task has completed, this provides the user
+     * with a mechanism to know when a task object can be freed.
+     * <P>
+     * To have the Thread Pool perform a task, the user enqueue's an
+     * object that implements the <code>Runnable</code> insterface and
+     * one of the worker threads will executing it in its thread context.
+     */
+    class ThreadPool : public PooledThreadListener
+    {
+    public:
+
+        // Constants
+        static const size_t DEFAULT_MAX_POOL_SIZE  = 10;
+        static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
+
+        // Types
+        typedef std::pair<Runnable*, TaskListener*> Task;
+
+    private:
+
+        // Vector of threads that this object has created for its pool.
+        std::vector< PooledThread* > pool;
+
+        // Queue of Task that are in need of completion
+        util::Queue<Task> queue;
+
+        // Max number of Threads this Pool can contian
+        std::size_t maxThreads;
+
+        // Max number of tasks that can be allocated at a time
+        std::size_t blockSize;
+
+        // boolean flag use to indocate that this object is shutting down.
+        bool shutdown;
+
+        // Count of threads that are currently free to perfom some work.
+        std::size_t freeThreads;
+
+        // Mutex for locking operations that affect the pool.
+        Mutex poolLock;
+
+        // Logger Init
+        LOGCMS_DECLARE(logger)
+        LOGCMS_DECLARE(marker)
+
+    private:   // Statics
+
+        // The singleton instance of this class
+        static ThreadPool instance;
+
+    public:
+
+        ThreadPool();
+        virtual ~ThreadPool();
+
+        /**
+         * Queue a task to be completed by one of the Pooled Threads.
+         * tasks are serviced as soon as a <code>PooledThread</code>
+         * is available to run it.
+         * @param task object that derives from Runnable
+         * @throws ActiveMQException
+         */
+        virtual void queueTask( Task task )
+            throw ( lang::Exception );
+
+        /**
+         * DeQueue a task to be completed by one of the Pooled Threads.
+         * A caller of this method will block until there is something
+         * in the tasks queue, therefore care must be taken when calling
+         * this function.  Normally clients of ThreadPool don't use
+         * this, only the <code>PooledThread</code> objects owned by
+         * this ThreadPool.
+         * @return object that derives from Runnable
+         * @throws ActiveMQException
+         */
+        virtual Task deQueueTask()
+            throw ( lang::Exception );
+
+        /**
+         * Returns the current number of Threads in the Pool, this is
+         * how many there are now, not how many are active or the max
+         * number that might exist.
+         * @return integer number of threads in existance.
+         */
+        virtual std::size_t getPoolSize() const { return pool.size(); }
+
+        /**
+         * Returns the current backlog of items in the tasks queue, this
+         * is how much work is still waiting to get done.
+         * @return number of outstanding tasks.
+         */
+        virtual std::size_t getBacklog() const { return queue.size(); }
+
+        /**
+         * Ensures that there is at least the specified number of Threads
+         * allocated to the pool.  If the size is greater than the MAX
+         * number of threads in the pool, then only MAX threads are
+         * reservved.  If the size is smaller than the number of threads
+         * currently in the pool, than nothing is done.
+         * @param size the number of threads to reserve.
+         */
+        virtual void reserve( std::size_t size );
+
+        /**
+         * Get the Max Number of Threads this Pool can contain
+         * @return max size
+         */
+        virtual std::size_t getMaxThreads() const { return maxThreads; }
+
+        /**
+         * Sets the Max number of threads this pool can contian.
+         * if this value is smaller than the current size of the
+         * pool nothing is done.
+         * @param maxThreads total number of threads that can be pooled
+         */
+        virtual void setMaxThreads( std::size_t maxThreads );
+
+        /**
+         * Gets the Max number of threads that can be allocated at a time
+         * when new threads are needed.
+         * @return max Thread Block Size
+         */
+        virtual std::size_t getBlockSize() const { return blockSize; }
+
+        /**
+         * Sets the Max number of Threads that can be allocated at a time
+         * when the Thread Pool determines that more Threads are needed.
+         * @param blockSize Max Thread Block Size
+         */
+        virtual void setBlockSize( std::size_t blockSize );
+
+        /**
+         * Returns the current number of available threads in the pool, threads
+         * that are performing a user task are considered unavailable.  This value
+         * could change immeadiately after calling as Threads could finish right
+         * after and be available again.  This is informational only.
+         * @return totoal free threads
+         */
+        virtual std::size_t getFreeThreadCount() const {
+            return freeThreads;
+        }
+
+    public: // PooledThreadListener Callbacks
+
+        /**
+         * Called by a pooled thread when it is about to begin
+         * executing a new task.  This will decrement the available
+         * threads counter so that this object knows when there are
+         * no more free threads and must create new ones.
+         * @param thread Pointer to the Pooled Thread that is making this call
+         */
+        virtual void onTaskStarted( PooledThread* thread );
+
+        /**
+         * Called by a pooled thread when it has completed a task
+         * and is going back to waiting for another task to run,
+         * this will increment the free threads counter.
+         * @param thread Pointer the the Pooled Thread that is making this call.
+         */
+        virtual void onTaskCompleted( PooledThread* thread );
+
+        /**
+         * Called by a pooled thread when it has encountered an exception
+         * while running a user task, after receiving this notification
+         * the callee should assume that the PooledThread is now no longer
+         * running.
+         * @param thread Pointer to the Pooled Thread that is making this call
+         * @param ex The Exception that occured.
+         */
+        virtual void onTaskException( PooledThread* thread,
+                                      lang::Exception& ex );
+
+    public:   // Statics
+
+        /**
+         * Return the one and only Thread Pool instance.
+         * @return The Thread Pool Pointer
+         */
+        static ThreadPool* getInstance() {
+            return &instance;
+        }
+
+    private:
+
+        /**
+         * Allocates the requested ammount of Threads, won't exceed
+         * <code>maxThreads</code>.
+         * @param count the number of threads to create
+         */
+        void AllocateThreads( std::size_t count );
+
+    };
+
+}}}
+
+#endif /*_DECAF_UTIL_CONCURRENT_THREADPOOL_H_*/

Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h?view=diff&rev=543581&r1=543580&r2=543581
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/logging/LoggerDefines.h Fri
Jun  1 12:20:11 2007
@@ -21,13 +21,13 @@
 #include <sstream>
 
 #define LOGDECAF_DECLARE(loggerName)                                  \
-   static decaf::logger::SimpleLogger loggerName;
+   static decaf::util::logging::SimpleLogger loggerName;
 
 #define LOGDECAF_INITIALIZE(loggerName, className, loggerFamily)      \
-   decaf::logger::SimpleLogger className::loggerName(loggerFamily);
+   decaf::util::logging::SimpleLogger className::loggerName(loggerFamily);
 
 #define LOGDECAF_DECLARE_LOCAL(loggerName)                            \
-   decaf::logger::Logger loggerName;
+   decaf::util::logging::Logger loggerName;
 
 #define LOGDECAF_DEBUG(logger, message)                               \
    logger.debug(__FILE__, __LINE__, message);



Mime
View raw message