activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1086005 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/decaf/util/concurrent/ test/ test/decaf/util/concurrent/
Date Sun, 27 Mar 2011 18:58:55 GMT
Author: tabish
Date: Sun Mar 27 18:58:54 2011
New Revision: 1086005

URL: http://svn.apache.org/viewvc?rev=1086005&view=rev
Log:
Removes the old ThreadPool implementation and adds a new ThreadPoolExecutor implementation along with a LinkedBlockingQueue implementation.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
      - copied, changed from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
      - copied, changed from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
      - copied, changed from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
      - copied, changed from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.h
Removed:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/PooledThread.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/PooledThread.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/PooledThreadListener.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/TaskListener.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/TaskListener.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.h
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1086005&r1=1086004&r2=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Sun Mar 27 18:58:54 2011
@@ -444,14 +444,13 @@ cc_sources = \
     decaf/util/concurrent/Executor.cpp \
     decaf/util/concurrent/ExecutorService.cpp \
     decaf/util/concurrent/Future.cpp \
+    decaf/util/concurrent/LinkedBlockingQueue.cpp \
     decaf/util/concurrent/Lock.cpp \
     decaf/util/concurrent/Mutex.cpp \
-    decaf/util/concurrent/PooledThread.cpp \
     decaf/util/concurrent/Semaphore.cpp \
     decaf/util/concurrent/SynchronousQueue.cpp \
-    decaf/util/concurrent/TaskListener.cpp \
     decaf/util/concurrent/ThreadFactory.cpp \
-    decaf/util/concurrent/ThreadPool.cpp \
+    decaf/util/concurrent/ThreadPoolExecutor.cpp \
     decaf/util/concurrent/TimeUnit.cpp \
     decaf/util/concurrent/atomic/AtomicBoolean.cpp \
     decaf/util/concurrent/atomic/AtomicInteger.cpp \
@@ -1035,18 +1034,16 @@ h_sources = \
     decaf/util/concurrent/Executor.h \
     decaf/util/concurrent/ExecutorService.h \
     decaf/util/concurrent/Future.h \
+    decaf/util/concurrent/LinkedBlockingQueue.h \
     decaf/util/concurrent/Lock.h \
     decaf/util/concurrent/Mutex.h \
-    decaf/util/concurrent/PooledThread.h \
-    decaf/util/concurrent/PooledThreadListener.h \
     decaf/util/concurrent/RejectedExecutionException.h \
     decaf/util/concurrent/RejectedExecutionHandler.h \
     decaf/util/concurrent/Semaphore.h \
     decaf/util/concurrent/Synchronizable.h \
     decaf/util/concurrent/SynchronousQueue.h \
-    decaf/util/concurrent/TaskListener.h \
     decaf/util/concurrent/ThreadFactory.h \
-    decaf/util/concurrent/ThreadPool.h \
+    decaf/util/concurrent/ThreadPoolExecutor.h \
     decaf/util/concurrent/TimeUnit.h \
     decaf/util/concurrent/TimeoutException.h \
     decaf/util/concurrent/atomic/AtomicBoolean.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h?rev=1086005&r1=1086004&r2=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h Sun Mar 27 18:58:54 2011
@@ -179,6 +179,8 @@ namespace concurrent {
          */
         virtual void put( const E& value ) = 0;
 
+        using Queue<E>::offer;
+
         /**
          * Inserts the specified element into this queue, waiting up to the specified wait
          * time if necessary for space to become available.

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.cpp?rev=1086005&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.cpp Sun Mar 27 18:58:54 2011
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LinkedBlockingQueue.h"

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h?rev=1086005&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h Sun Mar 27 18:58:54 2011
@@ -0,0 +1,769 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
+#define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/Lock.h>
+#include <decaf/util/concurrent/BlockingQueue.h>
+#include <decaf/util/AbstractQueue.h>
+#include <decaf/util/Iterator.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Math.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/NoSuchElementException.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/lang/exceptions/IllegalStateException.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+    using decaf::lang::Pointer;
+
+    /**
+     * A BlockingQueue derivative that allows for a bound to be placed on the number of elements
+     * that can be enqueued at any one time.  Elements are inserted and removed in FIFO order.
+     * The internal structure of the queue is based on a linked nodes which provides for better
+     * performance over their array based versions but the performance is less predictable.
+     *
+     * The capacity bound of this class default to Integer::MAX_VALUE.
+     *
+     * @since 1.0
+     */
+    template<typename E>
+    class LinkedBlockingQueue : public BlockingQueue<E> {
+    private:
+
+        template< typename U >
+        class QueueNode {
+        private:
+
+            U value;
+            bool unlinked;
+            bool dequeued;
+
+        public:
+
+            Pointer< QueueNode<E> > next;
+
+        private:
+
+            QueueNode( const QueueNode& );
+            QueueNode& operator= ( const QueueNode& );
+
+        public:
+
+            QueueNode() : value(), unlinked(false), dequeued(false), next() {}
+            QueueNode(const U& value) : value(value), unlinked(false), dequeued(false), next() {}
+
+            void set(Pointer< QueueNode<U> > next, const U& value) {
+                this->next = next;
+                this->value = value;
+                this->unlinked = false;
+                this->dequeued = false;
+            }
+
+            E get() const {
+                return this->value;
+            }
+
+            E getAndDequeue() {
+                E result = this->value;
+                this->value = E();
+                this->dequeued = true;
+
+                return result;
+            }
+
+            void unlink() {
+                this->value = E();
+                this->unlinked = true;
+            }
+
+            bool isUnlinked() const {
+                return this->unlinked;
+            }
+
+            bool isDequeued() const {
+                return this->dequeued;
+            }
+        };
+
+        class TotalLock {
+        private:
+
+            TotalLock( const TotalLock& src );
+            TotalLock& operator= ( const TotalLock& src );
+
+        private:
+
+            const LinkedBlockingQueue<E>* parent;
+
+        public:
+
+            TotalLock( const LinkedBlockingQueue<E>* parent ) : parent(parent) {
+                parent->putLock.lock();
+                parent->takeLock.lock();
+            }
+
+            ~TotalLock() {
+                parent->putLock.unlock();
+                parent->takeLock.unlock();
+            }
+
+        };
+
+    private:
+
+        int capacity;
+        decaf::util::concurrent::atomic::AtomicInteger count;
+
+        mutable decaf::util::concurrent::Mutex putLock;
+        mutable decaf::util::concurrent::Mutex takeLock;
+
+        Pointer< QueueNode<E> > head;
+        Pointer< QueueNode<E> > tail;
+
+    public:
+
+        /**
+         * Create a new instance with a Capacity of Integer::MAX_VALUE
+         */
+        LinkedBlockingQueue() : BlockingQueue<E>(), capacity(lang::Integer::MAX_VALUE), count(),
+                                putLock(), takeLock(), head(new QueueNode<E>()), tail() {
+
+            this->tail = this->head;
+        }
+
+        /**
+         * Create a new instance with the given initial capacity value.
+         *
+         * @param capacity
+         *      The initial capacity value to assign to this Queue.
+         *
+         * @throws IllegalArgumentException if the specified capacity is not greater than zero.
+         */
+        LinkedBlockingQueue(int capacity) : BlockingQueue<E>(), capacity(capacity), count(),
+                                            putLock(), takeLock(), head(new QueueNode<E>()), tail() {
+            if(capacity <= 0) {
+                throw decaf::lang::exceptions::IllegalArgumentException(
+                    __FILE__, __LINE__, "Capacity value must be greater than zero.");
+            }
+
+            this->tail = this->head;
+        }
+
+        /**
+         * Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the
+         * values contained in the specified collection to this Queue.
+         *
+         * @param collection
+         *      The Collection whose elements are to be copied to this Queue.
+         *
+         * @throws IllegalStateException if the number of elements in the collection exceeds
+         *         this Queue's capacity.
+         */
+        LinkedBlockingQueue(const Collection<E>& collection) : BlockingQueue<E>(),
+                                                               capacity(lang::Integer::MAX_VALUE),
+                                                               count(), putLock(), takeLock(),
+                                                               head(new QueueNode<E>()), tail() {
+
+            this->tail = this->head;
+
+            Pointer< Iterator<E> > iter(collection.iterator());
+
+            try {
+
+                int count = 0;
+
+                while(iter->hasNext()) {
+                    if(count == this->capacity) {
+                        throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
+                            "Number of elements in the Collection exceeds this Queue's Capacity.");
+                    }
+
+                    this->enqueue(iter->next());
+                    ++count;
+                }
+
+                this->count.set(count);
+            }
+            DECAF_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
+            DECAF_CATCH_RETHROW(decaf::lang::Exception)
+            DECAF_CATCHALL_THROW(decaf::lang::Exception)
+        }
+
+        virtual ~LinkedBlockingQueue() {
+            try{
+                this->purgeList();
+            } catch(...) {}
+        }
+
+    public:
+
+        LinkedBlockingQueue<E>& operator= ( const LinkedBlockingQueue<E>& queue ) {
+            this->clear();
+            this->addAll(queue);
+            return *this;
+        }
+
+        LinkedBlockingQueue<E>& operator= ( const Collection<E>& collection ) {
+            this->clear();
+            this->addAll(collection);
+            return *this;
+        }
+
+    public:
+
+        virtual int size() const {
+            return this->count.get();
+        }
+
+        virtual void clear() {
+
+            TotalLock lock(this);
+
+            this->purgeList();
+            this->tail = this->head;
+            this->count.set(0);
+
+            if(this->count.getAndSet(0) == this->capacity) {
+                this->putLock.notify();
+            }
+        }
+
+        virtual int remainingCapacity() const {
+            return this->capacity - this->count.get();
+        }
+
+        virtual void put( const E& value ) {
+
+            int c = -1;
+
+            synchronized(&this->putLock) {
+
+                while(this->count.get() == this->capacity) {
+                    this->putLock.wait();
+                }
+
+                // This method now owns the putLock so we know we have at least
+                // enough capacity for one put, if we enqueue an item and there's
+                // still more room we should signal a waiting put to ensure that
+                // threads don't wait forever.
+                enqueue(value);
+                c = this->count.getAndIncrement();
+
+                if(c + 1 < this->capacity) {
+                    this->putLock.notify();
+                }
+            }
+
+            // When c is zero it means we at least incremented once so there was
+            // something in the Queue, another take could have already happened but
+            // we don't know so wake up a waiting taker.
+            if(c == 0) {
+                this->signalNotEmpty();
+            }
+        }
+
+        virtual bool offer( const E& value, long long timeout, const TimeUnit& unit ) {
+
+            int c = -1;
+
+            synchronized(&this->putLock) {
+
+                while(this->count.get() == this->capacity) {
+                    this->putLock.wait(unit.toMillis(timeout));
+                }
+
+                enqueue(value);
+                c = this->count.getAndIncrement();
+
+                if(c + 1 < this->capacity) {
+                    this->putLock.notify();
+                }
+            }
+
+            if(c == 0) {
+                this->signalNotEmpty();
+            }
+
+            return true;
+        }
+
+        virtual bool offer( const E& value ) {
+
+            if(this->count.get() == this->capacity) {
+                return false;
+            }
+
+            int c = -1;
+            synchronized(&this->putLock) {
+                if(this->count.get() < this->capacity) {
+
+                    enqueue(value);
+                    c = this->count.getAndIncrement();
+
+                    if(c + 1 < this->capacity) {
+                        this->putLock.notify();
+                    }
+                }
+            }
+
+            if(c == 0) {
+                this->signalNotEmpty();
+            }
+
+            return c >= 0;
+        }
+
+        virtual E take() {
+            E value;
+            int c = -1;
+            synchronized(&this->takeLock) {
+
+                while(this->count.get() == 0) {
+                     this->takeLock.wait();
+                }
+
+                // Since this methods owns the takeLock and count != 0 we know that
+                // its safe to take one element.  if c is greater than one then there
+                // is at least one more so we try to wake up another taker if any.
+                value = dequeue();
+                c = this->count.getAndDecrement();
+
+                if(c > 1) {
+                    this->takeLock.notify();
+                }
+            }
+
+            // When c equals capacity we have removed at least one element
+            // from the Queue so we wake a blocked put operation if there is
+            // one to prevent a deadlock.
+            if(c == this->capacity) {
+                this->signalNotFull();
+            }
+
+            return value;
+        }
+
+        virtual bool poll( E& result, long long timeout, const TimeUnit& unit ) {
+            int c = -1;
+            synchronized(&this->takeLock) {
+                if(this->count.get() == 0) {
+                    if(timeout <= 0) {
+                        return false;
+                    }
+                    this->takeLock.wait(unit.toMillis(timeout));
+                    if(this->count.get() == 0) {
+                        return false;
+                    }
+                }
+
+                result = dequeue();
+                c = this->count.getAndDecrement();
+
+                if(c > 1) {
+                    this->takeLock.notify();
+                }
+            }
+
+            if(c == this->capacity) {
+                this->signalNotFull();
+            }
+
+            return true;
+        }
+
+        virtual bool poll(E& result) {
+
+            if(this->count.get() == 0) {
+                return false;
+            }
+
+            int c = -1;
+            synchronized(&this->takeLock) {
+
+                if(this->count.get() > 0) {
+                    result = dequeue();
+                    c = this->count.getAndDecrement();
+
+                    if(c > 1) {
+                        this->takeLock.notify();
+                    }
+                }
+            }
+
+            if(c == this->capacity) {
+                this->signalNotFull();
+            }
+
+            return true;
+        }
+
+        virtual bool peek(E& result) const {
+
+            if(this->count.get() == 0) {
+                return false;
+            }
+
+            synchronized(&this->takeLock) {
+                Pointer< QueueNode<E> > front = this->head->next;
+                if(front == NULL) {
+                    return false;
+                } else {
+                    result = front->get();
+                }
+            }
+
+            return true;
+        }
+
+        using AbstractQueue<E>::remove;
+
+        virtual bool remove(const E& value) {
+
+            TotalLock lock(this);
+
+            for(Pointer< QueueNode<E> > predicessor = this->head, p = predicessor->next; p != NULL;
+                predicessor = p, p = p->next) {
+
+                if(value == p->get()) {
+                    unlink(p, predicessor);
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        virtual std::vector<E> toArray() const {
+
+            TotalLock lock(this);
+
+            int size = this->count.get();
+            std::vector<E> array;
+            array.reserve(size);
+
+            for(Pointer< QueueNode<E> > p = this->head->next; p != NULL; p = p->next) {
+                array.push_back(p->get());
+            }
+
+            return array;
+        }
+
+        virtual std::string toString() const {
+            return std::string("LinkedBlockingQueue [ current size = ") +
+                   decaf::lang::Integer::toString(this->count.get()) + "]";
+        }
+
+        virtual int drainTo( Collection<E>& c ) {
+            return this->drainTo(c, decaf::lang::Integer::MAX_VALUE);
+        }
+
+        virtual int drainTo( Collection<E>& sink, int maxElements ) {
+
+            if(&sink == this) {
+                throw decaf::lang::exceptions::IllegalArgumentException(__FILE__, __LINE__,
+                    "Cannot drain this Collection to itself.");
+            }
+
+            bool signalNotFull = false;
+            bool shouldThrow = false;
+            decaf::lang::Exception delayed;
+            int result = 0;
+
+            synchronized(&this->takeLock) {
+
+                // We get the count of Nodes that exist now, any puts that are done
+                // after this are not drained and since we hold the lock nothing can
+                // get taken so state should remain consistent.
+                result = decaf::lang::Math::min(maxElements, this->count.get());
+                Pointer< QueueNode<E> > node = this->head;
+                int i = 0;
+                try {
+
+                    while(i < result) {
+                        Pointer< QueueNode<E> > p = node->next;
+                        sink.add( p->getAndDequeue() );
+                        node = p;
+                        ++i;
+                    }
+
+                } catch(decaf::lang::Exception& e) {
+                    delayed = e;
+                    shouldThrow = true;
+                }
+
+                if(i > 0) {
+                    this->head = node;
+                    signalNotFull = (this->count.getAndAdd(-i) == this->capacity);
+                }
+            }
+
+            if(signalNotFull) {
+                this->signalNotFull();
+            }
+
+            if(shouldThrow) {
+                throw delayed;
+            }
+
+            return result;
+        }
+
+    private:
+
+        class LinkedIterator : public Iterator<E> {
+        private:
+
+            Pointer< QueueNode<E> > current;
+            Pointer< QueueNode<E> > last;
+            E currentElement;
+            LinkedBlockingQueue<E>* parent;
+
+        public:
+
+            LinkedIterator(LinkedBlockingQueue<E>* parent) : current(), last(),
+                                                             currentElement(), parent(parent) {
+                TotalLock lock(parent);
+
+                this->current = parent->head->next;
+                if(this->current != NULL) {
+                    this->currentElement = current->get();
+                }
+            }
+
+            virtual bool hasNext() const {
+                return this->current != NULL;
+            }
+
+            virtual E next() {
+
+                TotalLock lock(this->parent);
+
+                if(this->current == NULL) {
+                    throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
+                        "Iterator next called with no matching next element.");
+                }
+
+                E result = this->currentElement;
+                this->last = this->current;
+                this->current = this->nextNode(this->current);
+                this->currentElement = (this->current == NULL) ? E() : this->current->get();
+
+                return result;
+            }
+
+            virtual void remove() {
+
+                if(this->last == NULL) {
+                    throw decaf::lang::exceptions::IllegalStateException(__FILE__, __LINE__,
+                        "Iterator remove called without having called next().");
+                }
+
+                TotalLock lock(this->parent);
+
+                Pointer< QueueNode<E> > node;
+                node.swap(this->last);
+
+                for(Pointer< QueueNode<E> > trail = this->parent->head, p = trail->next; p != NULL;
+                    trail = p, p = p->next) {
+
+                    if(p == node) {
+                        this->parent->unlink(p, trail);
+                        break;
+                    }
+                }
+            }
+
+        private:
+
+            Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
+
+                // Handle the case of a dequeued Node, the new head of Queue
+                // will be parent->head->next() even if the Queue is empty.
+                if(p->isDequeued()) {
+                    return this->parent->head->next;
+                }
+
+                Pointer< QueueNode<E> > s = p->next;
+
+                // Handle Nodes that have been removed from the interior of the
+                // Queue, these are tagged but still retain their next() value
+                // in order to account for multiple removes.  If all nodes were
+                // removed from the last call then eventually we reach next() == NULL
+                // which is the old tail.
+                while(s != NULL && s->isUnlinked()) {
+                    s = s->next;
+                }
+
+                return s;
+            }
+
+        };
+
+        class ConstLinkedIterator : public Iterator<E> {
+        private:
+
+            Pointer< QueueNode<E> > current;
+            Pointer< QueueNode<E> > last;
+            E currentElement;
+            const LinkedBlockingQueue<E>* parent;
+
+        public:
+
+            ConstLinkedIterator(const LinkedBlockingQueue<E>* parent) : current(), last(),
+                                                                        currentElement(),
+                                                                        parent(parent) {
+                TotalLock lock(parent);
+
+                this->current = parent->head->next;
+                if(this->current != NULL) {
+                    this->currentElement = current->get();
+                }
+            }
+
+            virtual bool hasNext() const {
+                return this->current != NULL;
+            }
+
+            virtual E next() {
+
+                TotalLock lock(this->parent);
+
+                if(this->current == NULL) {
+                    throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
+                        "Iterator next called with no matching next element.");
+                }
+
+                E result = this->currentElement;
+                this->last = this->current;
+                this->current = this->nextNode(this->current);
+                this->currentElement = (this->current == NULL) ? E() : this->current->get();
+
+                return result;
+            }
+
+            virtual void remove() {
+                throw lang::exceptions::UnsupportedOperationException(
+                    __FILE__, __LINE__, "Cannot write to a const ListIterator." );
+            }
+
+        private:
+
+            Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
+
+                // Handle the case of a dequeued Node, the new head of Queue
+                // will be parent->head->next() even if the Queue is empty.
+                if(p->isDequeued()) {
+                    return this->parent->head->next;
+                }
+
+                Pointer< QueueNode<E> > s = p->next;
+
+                // Handle Nodes that have been removed from the interior of the
+                // Queue, these are tagged but still retain their next() value
+                // in order to account for multiple removes.  If all nodes were
+                // removed from the last call then eventually we reach next() == NULL
+                // which is the old tail.
+                while(s != NULL && s->isUnlinked()) {
+                    s = s->next;
+                }
+
+                return s;
+            }
+
+        };
+
+    public:
+
+        virtual decaf::util::Iterator<E>* iterator() {
+            return new LinkedIterator(this);
+        }
+
+        virtual decaf::util::Iterator<E>* iterator() const {
+            return new ConstLinkedIterator(this);
+        }
+
+    private:
+
+        void unlink(Pointer< QueueNode<E> >& p, Pointer< QueueNode<E> >& predicessor) {
+
+            // In order to prevent Iterators from losing their ability to provide
+            // weakly consistent iteration the next value of p is left intact but
+            // the node is marked as unlinked and it value is reset to default.
+            p->unlink();
+
+            predicessor->next = p->next;
+
+            if(this->tail == p) {
+                this->tail = predicessor;
+            }
+
+            if(this->count.getAndDecrement() == capacity) {
+                this->signalNotFull();
+            }
+        }
+
+        void signalNotEmpty() {
+            synchronized(&this->takeLock) {
+                this->takeLock.notify();
+            }
+        }
+
+        void signalNotFull() {
+            synchronized(&this->putLock) {
+                this->putLock.notify();
+            }
+        }
+
+        // Must be called with the putLock locked.
+        void enqueue(E value) {
+            Pointer< QueueNode<E> > newTail( new QueueNode<E>(value) );
+            this->tail->next = newTail;
+            this->tail = newTail;
+        }
+
+        // Must be called with the takeLock locked.
+        E dequeue() {
+            Pointer< QueueNode<E> > temp = this->head;
+            Pointer< QueueNode<E> > newHead = temp->next;
+            this->head = newHead;
+
+            return newHead->getAndDequeue();
+        }
+
+        void purgeList() {
+            Pointer< QueueNode<E> > current = this->head->next;
+            Pointer< QueueNode<E> > temp;
+            while(current != NULL) {
+                temp = current;
+                current = current->next;
+                temp.reset(NULL);
+            }
+        }
+    };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?p2=activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp&p1=activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp&r1=1084577&r2=1086005&rev=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Sun Mar 27 18:58:54 2011
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <decaf/util/concurrent/ThreadPool.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
 #include <decaf/util/concurrent/Concurrent.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/util/concurrent/RejectedExecutionException.h>
 #include <decaf/util/Config.h>
-
-#ifdef min
-#undef min
-#endif
+#include <decaf/util/LinkedList.h>
+#include <decaf/lang/Pointer.h>
 
 #include <algorithm>
 #include <iostream>
@@ -34,161 +34,245 @@ using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-LOGDECAF_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool")
-LOGDECAF_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker")
+namespace decaf{
+namespace util{
+namespace concurrent{
 
-////////////////////////////////////////////////////////////////////////////////
-ThreadPool::ThreadPool() : pool(),
-                           queue(),
-                           maxThreads(DEFAULT_MAX_POOL_SIZE),
-                           blockSize(DEFAULT_MAX_BLOCK_SIZE),
-                           shutdown(false),
-                           freeThreads(0),
-                           poolLock() {
-}
+    class Worker;
 
-////////////////////////////////////////////////////////////////////////////////
-ThreadPool::~ThreadPool() {
+    class ExecutorKernel {
+    public:
 
-    try{
+        LinkedList<Worker*> workers;
+        AtomicBoolean stopping;
+        AtomicBoolean stopped;
+        std::size_t freeThreads;
 
-        std::vector<PooledThread*>::iterator itr = pool.begin();
+        int maxPoolSize;
+        int corePoolSize;
+        long long keepAliveTime;
+        Pointer< BlockingQueue<decaf::lang::Runnable*> > workQueue;
+        Mutex mainLock;
 
-        // Stop all the threads
-        for( ; itr != pool.end(); ++itr ) {
-            (*itr)->stop();
-        }
+    public:
 
-        // Set the shutdown flag so that the DeQueue methods all quit
-        // when we interrupt them.
-        shutdown = true;
+        ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
+                       BlockingQueue<decaf::lang::Runnable*>* workQueue);
 
-        synchronized( &queue ) {
-            // Signal the Queue so that all waiters are notified
-            queue.notifyAll();
-        }
+        ~ExecutorKernel();
 
-        // Wait for everyone to die
-        for( itr = pool.begin(); itr != pool.end(); ++itr ) {
-            (*itr)->join();
+        void onTaskStarted(Worker* thread);
 
-            // Destroy the threads
-            delete *itr;
-        }
+        void onTaskCompleted(Worker* thread);
 
-        pool.clear();
-    }
-    DECAF_CATCH_NOTHROW( lang::Exception )
-    DECAF_CATCHALL_NOTHROW()
-}
+        void onTaskException(Worker* thread, lang::Exception& ex);
 
-////////////////////////////////////////////////////////////////////////////////
-ThreadPool* ThreadPool::getInstance() {
+        void enQueueTask(Runnable* task);
 
-    static ThreadPool instance;
+        Runnable* deQueueTask();
 
-    return &instance;
-}
+        void AllocateThread();
 
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::queueTask( ThreadPool::Task task ) {
+        bool isStoppedOrStopping();
 
-    try{
+        void shutdown();
 
-        if( !task.first || !task.second ) {
-            throw exceptions::IllegalArgumentException(
-                __FILE__, __LINE__,
-                "ThreadPool::QueueTask - Invalid args for Task");
+    };
+
+    class Worker : public lang::Thread {
+    private:
+
+        bool busy;
+        bool done;
+        decaf::util::concurrent::ExecutorKernel* kernel;
+
+    private:
+
+        Worker( const Worker& );
+        Worker& operator= ( const Worker& );
+
+     public:
+
+        Worker(decaf::util::concurrent::ExecutorKernel* kernel) :
+            Thread(), busy(false), done(false), kernel(kernel) {
+
+            if( kernel == NULL ) {
+                throw IllegalArgumentException( __FILE__, __LINE__,
+                    "ThreadPoolExecutor Worker requires non-NULL pointer to parent ExecutorKernel");
+            }
         }
 
-        //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");
+        ~Worker() {}
 
-        synchronized( &queue ) {
-            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");
+        void run() {
 
-            // If there's nobody open to do work, then create some more
-            // threads to handle the work.
-            if( freeThreads == 0 ) {
-                AllocateThreads(blockSize);
+            try {
+
+                while(!this->done) {
+
+                    // Blocks until there something to be done
+                    Runnable* task = this->kernel->deQueueTask();
+
+                    if(this->done) {
+                        break;
+                    }
+
+                    if(!task) {
+                        throw Exception( __FILE__, __LINE__,
+                            "Worker - Retrieved NULL task from Kernel.");
+                    }
+
+                    this->busy = true;
+                    this->kernel->onTaskStarted(this);
+                    try{
+                        task->run();
+                    } catch(...) {}
+                    this->kernel->onTaskCompleted(this);
+                    this->busy = false;
+                }
+            }
+            catch( Exception& ex )
+            {
+                ex.setMark( __FILE__, __LINE__ );
+                this->busy = false;
+                this->kernel->onTaskException(this, ex);
+            }
+            catch(...)
+            {
+                Exception ex(__FILE__, __LINE__, "Worker - Caught Unknown Exception");
+                this->busy = false;
+                this->kernel->onTaskException(this, ex);
             }
+        }
+
+        void stop() {
+            this->done = true;
+        }
 
-            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
+        bool isBusy() {
+            return this->busy;
+        }
 
-            // queue the new work.
-            queue.offer(task);
+    };
 
-            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
+}}}
 
-            // Inform waiters that we put some work on the queue.
-            queue.notify();
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                                       long long keepAliveTime, const TimeUnit& unit,
+                                       BlockingQueue<decaf::lang::Runnable*>* workQueue) :
+    kernel(new ExecutorKernel(corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::~ThreadPoolExecutor() {
+
+    try{
+        delete kernel;
+    }
+    DECAF_CATCH_NOTHROW( lang::Exception )
+    DECAF_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::execute(Runnable* task ) {
+
+    try{
+
+        if( task == NULL ) {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "ThreadPoolExecutor::execute - Supplied Runnable pointer was NULL.");
         }
+
+        this->kernel->enQueueTask(task);
     }
     DECAF_CATCH_RETHROW( lang::Exception )
     DECAF_CATCHALL_THROW( lang::Exception )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ThreadPool::Task ThreadPool::deQueueTask() {
+void ThreadPoolExecutor::shutdown() {
 
     try{
-        //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");
-
-        synchronized( &queue ) {
-
-           /*LOGCMS_DEBUG(logger,
-               "ThreadPool::DeQueueTask - sync'd checking queue empty");*/
+        this->kernel->shutdown();
+    }
+    DECAF_CATCH_RETHROW( lang::Exception )
+    DECAF_CATCHALL_THROW( lang::Exception )
+}
 
-            // Wait for work, wait in a while loop since another thread could
-            // be waiting for a lock and get the work before we get woken up
-            // from our wait.
-            while( queue.isEmpty() && !shutdown ) {
-               //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
+////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::getPoolSize() const {
+    return (int)this->kernel->workers.size();
+}
 
-               queue.wait();
+////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::getCorePoolSize() const {
+    return this->kernel->corePoolSize;
+}
 
-               //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
-            }
+////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::getMaximumPoolSize() const {
+    return this->kernel->maxPoolSize;
+}
 
-            // Don't give more work if we are closing down
-            if( shutdown ) {
-               return Task();
-            }
+////////////////////////////////////////////////////////////////////////////////
+long long ThreadPoolExecutor::getTaskCount() const {
+    return this->kernel->workQueue->size();
+}
 
-            // check size again.
-            if( queue.isEmpty() ) {
-               throw lang::Exception(
-                   __FILE__, __LINE__,
-                   "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
-            }
+////////////////////////////////////////////////////////////////////////////////
+ExecutorKernel::ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
+                               BlockingQueue<decaf::lang::Runnable*>* workQueue) :
+    workers(),
+    stopping(false),
+    stopped(false),
+    freeThreads(0),
+    maxPoolSize(maxPoolSize),
+    corePoolSize(corePoolSize),
+    keepAliveTime(keepAliveTime),
+    workQueue(workQueue),
+    mainLock() {
 
-            //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
+    if(corePoolSize < 0 || maxPoolSize <= 0 ||
+       maxPoolSize < corePoolSize || keepAliveTime < 0) {
 
-            // not empty so get the new work to do
-            return queue.remove();
-        }
+        throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range.");
+    }
 
-        return Task();
+    if(workQueue == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "BlockingQueue pointer was null");
     }
-    DECAF_CATCH_RETHROW( lang::Exception )
-    DECAF_CATCHALL_THROW( lang::Exception )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::reserve( std::size_t size ) {
+ExecutorKernel::~ExecutorKernel() {
+    try{
+        this->shutdown();
+    }
+    DECAF_CATCH_NOTHROW(Exception)
+    DECAF_CATCHALL_NOTHROW()
+}
 
-    try {
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::onTaskStarted(Worker* thread DECAF_UNUSED) {
 
-        synchronized( &poolLock ) {
+    try{
 
-            if( size < pool.size() || pool.size() == maxThreads ) {
-                return;
-            }
+        synchronized( &mainLock ) {
 
-            // How many do we reserve
-            std::size_t allocCount = size - pool.size();
+            freeThreads--;
 
-            // Allocate the new Threads
-            AllocateThreads(allocCount);
+            // Now that this callback has decremented the free threads counter
+            // lets check if there is any outstanding work to be done and no
+            // threads to handle it.  This could happen if the QueueTask
+            // method was called successively without any of the PooledThreads
+            // having a chance to wake up and service the queue.  This would
+            // cause the number of Task to exceed the number of free threads
+            // once the Threads got a chance to wake up and service the queue
+            if( freeThreads == 0 && !workQueue->isEmpty() ) {
+                AllocateThread();
+            }
         }
     }
     DECAF_CATCH_RETHROW( lang::Exception )
@@ -196,18 +280,11 @@ void ThreadPool::reserve( std::size_t si
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::setMaxThreads( std::size_t maxThreads ) {
-
-    try{
-
-        synchronized( &poolLock ) {
+void ExecutorKernel::onTaskCompleted(Worker* thread DECAF_UNUSED) {
 
-            if( maxThreads == 0 ) {
-                // Caller tried to do something stupid, ignore them.
-                return;
-            }
-
-            this->maxThreads = maxThreads;
+    try {
+        synchronized( &mainLock ) {
+            freeThreads++;
         }
     }
     DECAF_CATCH_RETHROW( lang::Exception )
@@ -215,122 +292,141 @@ void ThreadPool::setMaxThreads( std::siz
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::setBlockSize( std::size_t blockSize ) {
+void ExecutorKernel::onTaskException(Worker* thread, lang::Exception& ex DECAF_UNUSED) {
 
     try{
-        if( blockSize <= 0 ) {
-            // User tried something dumb, protect them from themselves
-            return;
-        }
 
-        synchronized( &poolLock ) {
-            this->blockSize = blockSize;
+        synchronized( &mainLock ) {
+
         }
     }
-    DECAF_CATCH_RETHROW( lang::Exception )
-    DECAF_CATCHALL_THROW( lang::Exception )
+    DECAF_CATCH_RETHROW( Exception )
+    DECAF_CATCHALL_THROW( Exception )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::AllocateThreads( std::size_t count ) {
+void ExecutorKernel::enQueueTask(Runnable* task) {
 
     try{
 
-        if( pool.size() >= maxThreads ) {
-            return;
-        }
+        synchronized( &this->mainLock ) {
 
-        synchronized( &poolLock ) {
-            // Take the min of alloc size of maxThreads since we don't
-            // want anybody sneaking eaxtra threads in, greedy bastards.
-            count = std::min(count, maxThreads - pool.size());
-
-            // Each time we create a thread we increment the free Threads
-            // counter, but before we call start so that the Thread doesn't
-            // get ahead of us.
-            for( std::size_t i = 0; i < count; ++i ) {
-                pool.push_back(new PooledThread(this));
-                pool.back()->setPooledThreadListener(this);
-                freeThreads++;
-                pool.back()->start();
+            // If there's nobody open to do work, then create some more
+            // threads to handle the work.
+            if( this->freeThreads == 0 ) {
+                AllocateThread();
             }
         }
+
+        // queue the new work.
+        if(!this->workQueue->offer(task)) {
+            throw RejectedExecutionException(__FILE__, __LINE__, "Task Rejected by work Q");
+        }
     }
-    DECAF_CATCH_RETHROW( lang::Exception )
-    DECAF_CATCHALL_THROW( lang::Exception )
+    DECAF_CATCH_RETHROW( Exception )
+    DECAF_CATCHALL_THROW( Exception )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::onTaskStarted( PooledThread* thread DECAF_UNUSED ) {
+Runnable* ExecutorKernel::deQueueTask() {
 
     try{
 
-        synchronized( &poolLock ) {
+        Runnable* task = NULL;
 
-            freeThreads--;
+        // Wait for work, wait in a while loop since another thread could
+        // be waiting for a lock and get the work before we get woken up
+        // from our wait.
+        while( !isStoppedOrStopping() ) {
+
+            // TODO - Threads aren't interruptible yet.
+            if(workQueue->poll(task, 10, TimeUnit::MILLISECONDS)) {
+                break;
+            }
+        }
 
-            // Now that this callback has decremented the free threads coutner
-            // let check if there is any outstanding work to be done and no
-            // threads to handle it.  This could happen if the QueueTask
-            // method was called successively without any of the PooledThreads
-            // having a chance to wake up and service the queue.  This would
-            // cause the number of Task to exceed the number of free threads
-            // once the Threads got a chance to wake up and service the queue
-            if( freeThreads == 0 && !queue.isEmpty() ) {
-                // Allocate a new block of threads
-                AllocateThreads( blockSize );
+        // Don't give more work if we are closing down
+        if(isStoppedOrStopping()) {
+
+            if(task != NULL) {
+                delete task;
             }
+
+            return NULL;
+        }
+
+        if( task == NULL ) {
+           throw lang::Exception(__FILE__, __LINE__,
+               "deQueueTask: Got empty Runnable while not in shutdown.");
         }
 
-        //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
+        return task;
     }
     DECAF_CATCH_RETHROW( lang::Exception )
     DECAF_CATCHALL_THROW( lang::Exception )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::onTaskCompleted( PooledThread* thread DECAF_UNUSED) {
+void ExecutorKernel::AllocateThread() {
 
-    try {
-        synchronized( &poolLock ) {
-            freeThreads++;
+    try{
+
+        if( this->workers.size() >= this->maxPoolSize ) {
+            return;
         }
 
-        //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
+        synchronized( &mainLock ) {
+            Worker* newWorker = new Worker(this);
+            this->workers.add(newWorker);
+            freeThreads++;
+            newWorker->start();
+        }
     }
     DECAF_CATCH_RETHROW( lang::Exception )
     DECAF_CATCHALL_THROW( lang::Exception )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::onTaskException(
-   PooledThread* thread,
-   lang::Exception& ex DECAF_UNUSED ) {
+bool ExecutorKernel::isStoppedOrStopping() {
+    if(this->stopped.get() || this->stopping.get()) {
+        return true;
+    }
 
-    //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");
+    return false;
+}
 
-    try{
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::shutdown() {
 
-        synchronized( &poolLock ) {
+    if(isStoppedOrStopping()) {
+        return;
+    }
 
-            // Delete the thread that had the exception and start a new
-            // one to take its place.
-            freeThreads--;
+    if(this->stopping.compareAndSet(false, true)) {
 
-            std::vector<PooledThread*>::iterator itr =
-                std::find(pool.begin(), pool.end(), thread);
+        synchronized(&mainLock) {
 
-            if( itr != pool.end() ) {
-                pool.erase(itr);
+            Pointer< Iterator<Worker*> > iter(this->workers.iterator());
+
+            while(iter->hasNext()) {
+                iter->next()->stop();
             }
 
-            // Bye-Bye Thread Object
-            delete thread;
+            // TODO - When threads are interruptible, we need to interrupt the Queue.
+            //synchronized( workQueue.get() ) {
+            //    // Signal the Queue so that all waiters are notified
+            //    workQueue->notifyAll();
+            //}
+
+            iter.reset(this->workers.iterator());
+            while(iter->hasNext()) {
+                Worker* worker = iter->next();
+                worker->join();
+                delete worker;
+            }
 
-            // Now allocate a replacement
-            AllocateThreads(1);
+            this->workers.clear();
+            this->stopped.set(true);
         }
     }
-    DECAF_CATCH_RETHROW( lang::Exception )
-    DECAF_CATCHALL_THROW( lang::Exception )
 }

Copied: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?p2=activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h&p1=activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h&r1=1084577&r2=1086005&rev=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Sun Mar 27 18:58:54 2011
@@ -14,16 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef _DECAF_UTIL_CONCURRENT_THREADPOOL_H_
-#define _DECAF_UTIL_CONCURRENT_THREADPOOL_H_
+#ifndef _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_
+#define _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_
 
 #include <decaf/lang/Runnable.h>
-#include <decaf/util/concurrent/PooledThread.h>
-#include <decaf/util/concurrent/PooledThreadListener.h>
-#include <decaf/util/concurrent/TaskListener.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/util/concurrent/ThreadFactory.h>
 #include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/BlockingQueue.h>
 #include <decaf/util/LinkedList.h>
-#include <decaf/util/logging/LoggerDefines.h>
 #include <decaf/util/Config.h>
 
 #include <vector>
@@ -32,6 +31,11 @@ namespace decaf{
 namespace util{
 namespace concurrent{
 
+    using decaf::lang::Pointer;
+    using decaf::util::concurrent::atomic::AtomicBoolean;
+
+    class ExecutorKernel;
+
     /**
      * Defines a Thread Pool object that implements the functionality
      * of pooling threads to perform user tasks.  The Thread Poll has
@@ -48,187 +52,105 @@ namespace concurrent{
      * object that implements the <code>Runnable</code> interface and
      * one of the worker threads will executing it in its thread context.
      */
-    class DECAF_API ThreadPool : public PooledThreadListener {
-    public:
-
-        // Constants
-        static const size_t DEFAULT_MAX_POOL_SIZE  = 10;
-        static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
-
-        // Types
-        typedef std::pair<lang::Runnable*, TaskListener*> Task;
-
+    class DECAF_API ThreadPoolExecutor {
     private:
 
-        ThreadPool( const ThreadPool& );
-        ThreadPool& operator= ( const ThreadPool& );
+        ThreadPoolExecutor( const ThreadPoolExecutor& );
+        ThreadPoolExecutor& operator= ( const ThreadPoolExecutor& );
 
     private:
 
-        // Vector of threads that this object has created for its pool.
-        std::vector< PooledThread* > pool;
-
-        // Queue of Task that are in need of completion
-        util::LinkedList<Task> queue;
-
-        // Max number of Threads this Pool can contain
-        std::size_t maxThreads;
-
-        // Max number of tasks that can be allocated at a time
-        std::size_t blockSize;
-
-        // boolean flag use to indicate that this object is shutting down.
-        bool shutdown;
-
-        // Count of threads that are currently free to perform some work.
-        std::size_t freeThreads;
-
-        // Mutex for locking operations that affect the pool.
-        Mutex poolLock;
-
-        // Logger Init
-        LOGDECAF_DECLARE(logger)
-        LOGDECAF_DECLARE(marker)
+        ExecutorKernel* kernel;
 
     public:
 
-        ThreadPool();
-        virtual ~ThreadPool();
-
-        /**
-         * Queue a task to be completed by one of the Pooled Threads.
-         * tasks are serviced as soon as a <code>PooledThread</code>
-         * is available to run it.
-         * @param task object that derives from Runnable
-         * @throws ActiveMQException
-         */
-        virtual void queueTask( Task task );
-
-        /**
-         * DeQueue a task to be completed by one of the Pooled Threads.
-         * A caller of this method will block until there is something
-         * in the tasks queue, therefore care must be taken when calling
-         * this function.  Normally clients of ThreadPool don't use
-         * this, only the <code>PooledThread</code> objects owned by
-         * this ThreadPool.
-         * @return object that derives from Runnable
-         * @throws ActiveMQException
-         */
-        virtual Task deQueueTask();
-
-        /**
-         * Returns the current number of Threads in the Pool, this is
-         * how many there are now, not how many are active or the max
-         * number that might exist.
-         * @return integer number of threads in existence.
-         */
-        virtual std::size_t getPoolSize() const { return pool.size(); }
-
-        /**
-         * Returns the current backlog of items in the tasks queue, this
-         * is how much work is still waiting to get done.
-         * @return number of outstanding tasks.
-         */
-        virtual std::size_t getBacklog() const { return queue.size(); }
-
-        /**
-         * Ensures that there is at least the specified number of Threads
-         * allocated to the pool.  If the size is greater than the MAX
-         * number of threads in the pool, then only MAX threads are
-         * reservved.  If the size is smaller than the number of threads
-         * currently in the pool, than nothing is done.
-         * @param size the number of threads to reserve.
-         */
-        virtual void reserve( std::size_t size );
-
-        /**
-         * Get the Max Number of Threads this Pool can contain
-         * @return max size
-         */
-        virtual std::size_t getMaxThreads() const { return maxThreads; }
-
-        /**
-         * Sets the Max number of threads this pool can contian.
-         * if this value is smaller than the current size of the
-         * pool nothing is done.
-         * @param maxThreads total number of threads that can be pooled
-         */
-        virtual void setMaxThreads( std::size_t maxThreads );
-
-        /**
-         * Gets the Max number of threads that can be allocated at a time
-         * when new threads are needed.
-         * @return max Thread Block Size
-         */
-        virtual std::size_t getBlockSize() const { return blockSize; }
-
-        /**
-         * Sets the Max number of Threads that can be allocated at a time
-         * when the Thread Pool determines that more Threads are needed.
-         * @param blockSize Max Thread Block Size
-         */
-        virtual void setBlockSize( std::size_t blockSize );
-
-        /**
-         * Returns the current number of available threads in the pool, threads
-         * that are performing a user task are considered unavailable.  This value
-         * could change immediately after calling as Threads could finish right
-         * after and be available again.  This is informational only.
-         * @return total free threads
-         */
-        virtual std::size_t getFreeThreadCount() const {
-            return freeThreads;
-        }
-
-    public: // PooledThreadListener Callbacks
-
-        /**
-         * Called by a pooled thread when it is about to begin
-         * executing a new task.  This will decrement the available
-         * threads counter so that this object knows when there are
-         * no more free threads and must create new ones.
-         * @param thread Pointer to the Pooled Thread that is making this call
-         */
-        virtual void onTaskStarted( PooledThread* thread );
-
-        /**
-         * Called by a pooled thread when it has completed a task
-         * and is going back to waiting for another task to run,
-         * this will increment the free threads counter.
-         * @param thread Pointer the the Pooled Thread that is making this call.
-         */
-        virtual void onTaskCompleted( PooledThread* thread );
-
-        /**
-         * Called by a pooled thread when it has encountered an exception
-         * while running a user task, after receiving this notification
-         * the callee should assume that the PooledThread is now no longer
-         * running.
-         * @param thread Pointer to the Pooled Thread that is making this call
-         * @param ex The Exception that occurred.
-         */
-        virtual void onTaskException( PooledThread* thread,
-                                      lang::Exception& ex );
-
-    public:   // Statics
-
-        /**
-         * Return the one and only Thread Pool instance.
-         * @return The Thread Pool Pointer
-         */
-        static ThreadPool* getInstance();
-
-    private:
-
         /**
-         * Allocates the requested amount of Threads, won't exceed
-         * <code>maxThreads</code>.
-         * @param count the number of threads to create
+         * Creates a new instance of a ThreadPoolExecutor.
+         *
+         * The executor instance is configured with the passed in parameters and a
+         * default thread Factory is used along with a default rejected execution
+         * handler.
+         *
+         * @param corePoolSize
+         *      The number of threads to pool regardless of their idle state.
+         * @param maxPoolSize
+         *      The maximum number of threads that will ever exist at one time in the pool.
+         * @param keepAliveTime
+         *      The maximum time to keep a thread in the pool for if the number of current
+         *      threads exceeds to core pool size.
+         * @param unit
+         *      The units that the keepAliveTime is specified in.
+         * @param workQueue
+         *      A BlockingQueue implementation that will be used to hold Runnable tasks
+         *      that are awaiting execution within this executor.  The Executor takes
+         *      ownership of the BlockingQueue instance passed once this method returns.
+         *
+         * @throws IllegalArguementException if the corePoolSize or keepAliveTime are negative
+         *         or the or if maximumPoolSize is less than or equal to zero, or if corePoolSize
+         *         is greater than maximumPoolSize.
+         * @throws NullPointerException if the workQueue pointer is NULL.
+         */
+        ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                           long long keepAliveTime, const TimeUnit& unit,
+                           BlockingQueue<decaf::lang::Runnable*>* workQueue);
+
+        virtual ~ThreadPoolExecutor();
+
+        /**
+         * Queue a task to be completed by one of the Pooled Threads at some point in the
+         * future.  The task can be rejected by this executor if it has been shut down or
+         * if the workQueue is full, rejected Runnables are not deleted by this executor.
+         * Upon successful return from this method the given Runnable pointer is considered
+         * to be owned by this Executor and will be deleted upon completion or shut down.
+         *
+         * @param task
+         *      The Runnable object that is to be executed.
+         *
+         * @throws RejectedExecutionException based on instruction from RejectedExecutionHandler
+         *         if the given task cannot be accepted for execution at this time.
+         * @throws NullPointerException - if command is null
+         */
+        virtual void execute(decaf::lang::Runnable* task);
+
+        /**
+         * Performs an orderly shutdown of this Executor.  Previously queued tasks are allowed
+         * to complete but no new tasks are accepted for execution.  Calling this method more
+         * than once has no affect on this executor.
+         */
+        virtual void shutdown();
+
+        /**
+         * Returns the number of threads that currently exists for this Executor.
+         *
+         * @return the configured number of Threads in the Pool.
+         */
+        virtual int getPoolSize() const;
+
+        /**
+         * Returns the configured number of core threads for this Executor.
+         *
+         * @return the configured number of core Threads.
+         */
+        virtual int getCorePoolSize() const;
+
+        /**
+         * Returns the configured maximum number of threads for this Executor.
+         *
+         * @return the configured maximum number of Threads.
+         */
+        virtual int getMaximumPoolSize() const;
+
+        /**
+         * Returns the current number of pending tasks in the work queue.  This is
+         * an approximation as the number of pending tasks can quickly changes as
+         * tasks complete and new tasks are started.
+         *
+         * @return number of outstanding tasks, approximate.
          */
-        void AllocateThreads( std::size_t count );
+        virtual long long getTaskCount() const;
 
     };
 
 }}}
 
-#endif /*_DECAF_UTIL_CONCURRENT_THREADPOOL_H_*/
+#endif /*_DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1086005&r1=1086004&r2=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Sun Mar 27 18:58:54 2011
@@ -207,9 +207,10 @@ cc_sources = \
     decaf/util/concurrent/CopyOnWriteArrayListTest.cpp \
     decaf/util/concurrent/CopyOnWriteArraySetTest.cpp \
     decaf/util/concurrent/CountDownLatchTest.cpp \
+    decaf/util/concurrent/LinkedBlockingQueueTest.cpp \
     decaf/util/concurrent/MutexTest.cpp \
     decaf/util/concurrent/SynchronousQueueTest.cpp \
-    decaf/util/concurrent/ThreadPoolTest.cpp \
+    decaf/util/concurrent/ThreadPoolExecutorTest.cpp \
     decaf/util/concurrent/TimeUnitTest.cpp \
     decaf/util/concurrent/atomic/AtomicBooleanTest.cpp \
     decaf/util/concurrent/atomic/AtomicIntegerTest.cpp \
@@ -428,9 +429,10 @@ h_sources = \
     decaf/util/concurrent/CopyOnWriteArrayListTest.h \
     decaf/util/concurrent/CopyOnWriteArraySetTest.h \
     decaf/util/concurrent/CountDownLatchTest.h \
+    decaf/util/concurrent/LinkedBlockingQueueTest.h \
     decaf/util/concurrent/MutexTest.h \
     decaf/util/concurrent/SynchronousQueueTest.h \
-    decaf/util/concurrent/ThreadPoolTest.h \
+    decaf/util/concurrent/ThreadPoolExecutorTest.h \
     decaf/util/concurrent/TimeUnitTest.h \
     decaf/util/concurrent/atomic/AtomicBooleanTest.h \
     decaf/util/concurrent/atomic/AtomicIntegerTest.h \



Mime
View raw message