activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1086005 [2/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/decaf/util/concurrent/ test/ test/decaf/util/concurrent/
Date Sun, 27 Mar 2011 18:58:55 GMT
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp?rev=1086005&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
Sun Mar 27 18:58:54 2011
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LinkedBlockingQueueTest.h"
+
+#include <decaf/util/LinkedList.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+const int LinkedBlockingQueueTest::SIZE = 256;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    void populate( LinkedBlockingQueue<int>& queue, int n ) {
+
+        CPPUNIT_ASSERT( queue.isEmpty() );
+
+        for( int i = 0; i < n; ++i ) {
+            queue.add( i );
+        }
+
+        CPPUNIT_ASSERT( !queue.isEmpty());
+        CPPUNIT_ASSERT_EQUAL( n, queue.size() );
+    }
+
+    void populate( LinkedList<int>& list, int n ) {
+
+        CPPUNIT_ASSERT( list.isEmpty() );
+
+        for( int i = 0; i < n; ++i ) {
+            list.add( i );
+        }
+
+        CPPUNIT_ASSERT( !list.isEmpty());
+        CPPUNIT_ASSERT_EQUAL( n, list.size() );
+    }
+
+    void populate( LinkedBlockingQueue<std::string>& queue, int n ) {
+
+        CPPUNIT_ASSERT( queue.isEmpty() );
+
+        for( int i = 0; i < n; ++i ) {
+            queue.add( Integer::toString( i ) );
+        }
+
+        CPPUNIT_ASSERT( !queue.isEmpty());
+        CPPUNIT_ASSERT_EQUAL( n, queue.size() );
+    }
+
+    void populate( std::vector<int>& list, int n ) {
+
+        CPPUNIT_ASSERT( list.empty() );
+
+        for( int i = 0; i < n; ++i ) {
+            list.push_back( i );
+        }
+
+        CPPUNIT_ASSERT( !list.empty());
+        CPPUNIT_ASSERT_EQUAL( n, (int)list.size() );
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+LinkedBlockingQueueTest::LinkedBlockingQueueTest() {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+LinkedBlockingQueueTest::~LinkedBlockingQueueTest() {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testConstructor1() {
+
+    LinkedBlockingQueue<int> queue;
+
+    CPPUNIT_ASSERT_EQUAL(0, queue.size());
+    CPPUNIT_ASSERT(queue.isEmpty());
+    CPPUNIT_ASSERT_EQUAL((int)Integer::MAX_VALUE, queue.remainingCapacity());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testConstructor2() {
+
+    LinkedBlockingQueue<int> queue(SIZE);
+
+    CPPUNIT_ASSERT_EQUAL(0, queue.size());
+    CPPUNIT_ASSERT(queue.isEmpty());
+    CPPUNIT_ASSERT_EQUAL(SIZE, queue.remainingCapacity());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testConstructor3() {
+
+    LinkedList<int> list;
+    populate(list, SIZE);
+
+    LinkedBlockingQueue<int> q(list);
+
+    for (int i = 0; i < SIZE; ++i) {
+        int result;
+        CPPUNIT_ASSERT(q.poll(result));
+        CPPUNIT_ASSERT_EQUAL(list.get(i), result);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testConstructor4() {
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an IllegalArgumentException",
+        LinkedBlockingQueue<int>(-1),
+        IllegalArgumentException);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testEquals() {
+
+    LinkedBlockingQueue<int> q1;
+    populate( q1, 7 );
+    LinkedBlockingQueue<int> q2;
+    populate( q2, 7 );
+
+    CPPUNIT_ASSERT( q1.equals( q2 ) );
+    CPPUNIT_ASSERT( q2.equals( q1 ) );
+
+    q1.add( 42 );
+    CPPUNIT_ASSERT( !q1.equals( q2 ) );
+    CPPUNIT_ASSERT( !q2.equals( q1 ) );
+    q2.add( 42 );
+    CPPUNIT_ASSERT( q1.equals( q2 ) );
+    CPPUNIT_ASSERT( q2.equals( q1 ) );
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testEmptyFull() {
+
+    LinkedBlockingQueue<int> q(2);
+    CPPUNIT_ASSERT(q.isEmpty());
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("should have room for 2", 2, q.remainingCapacity());
+    q.add(1);
+    CPPUNIT_ASSERT(!q.isEmpty());
+    q.add(2);
+    CPPUNIT_ASSERT(!q.isEmpty());
+    CPPUNIT_ASSERT_EQUAL(0, q.remainingCapacity());
+    CPPUNIT_ASSERT(!q.offer(3));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testRemainingCapacity() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT_EQUAL(i, q.remainingCapacity());
+        CPPUNIT_ASSERT_EQUAL(SIZE - i, q.size());
+        q.remove();
+    }
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT_EQUAL(SIZE - i, q.remainingCapacity());
+        CPPUNIT_ASSERT_EQUAL(i, q.size());
+        q.add(i);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testOffer() {
+
+    LinkedBlockingQueue<int> q(1);
+    CPPUNIT_ASSERT(q.offer(0));
+    CPPUNIT_ASSERT(!q.offer(1));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testAdd() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT(q.add(i));
+    }
+    CPPUNIT_ASSERT_EQUAL(0, q.remainingCapacity());
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an IllegalStateException",
+        q.add(SIZE),
+        IllegalStateException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testAddAllSelf() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an IllegalArgumentException",
+        q.addAll(q),
+        IllegalArgumentException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testAddAll1() {
+
+    LinkedBlockingQueue<int> q(1);
+    LinkedList<int> list;
+
+    populate(list, SIZE);
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an IllegalStateException",
+        q.addAll(list),
+        IllegalStateException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testAddAll2() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    LinkedList<int> empty;
+    LinkedList<int> list;
+    populate(list, SIZE);
+
+    CPPUNIT_ASSERT(!q.addAll(empty));
+    CPPUNIT_ASSERT(q.addAll(list));
+
+    for (int i = 0; i < SIZE; ++i) {
+        int result;
+        CPPUNIT_ASSERT(q.poll(result));
+        CPPUNIT_ASSERT_EQUAL(list.get(i), result);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testPut() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    for(int i = 0; i < SIZE; ++i) {
+        q.put(i);
+        CPPUNIT_ASSERT(q.contains(i));
+    }
+    CPPUNIT_ASSERT_EQUAL(0, q.remainingCapacity());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testTake() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT_EQUAL(i, q.take());
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testPoll() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+    int result;
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT(q.poll(result));
+        CPPUNIT_ASSERT_EQUAL(i, result);
+    }
+
+    CPPUNIT_ASSERT(!q.poll(result));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testTimedPoll1() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+    int result;
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT(q.poll(result, 0, TimeUnit::MILLISECONDS));
+        CPPUNIT_ASSERT_EQUAL(i, result);
+    }
+
+    CPPUNIT_ASSERT(!q.poll(result, 0, TimeUnit::MILLISECONDS));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testTimedPoll2() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+    int result;
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT(q.poll(result, 100, TimeUnit::MILLISECONDS));
+        CPPUNIT_ASSERT_EQUAL(i, result);
+    }
+
+    CPPUNIT_ASSERT(!q.poll(result, 100, TimeUnit::MILLISECONDS));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testPeek() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+    int result;
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT(q.peek(result));
+        CPPUNIT_ASSERT_EQUAL(i, result);
+        q.poll(result);
+        CPPUNIT_ASSERT(q.peek(result) == false || i != result);
+    }
+
+    CPPUNIT_ASSERT(!q.peek(result));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testElement() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+    int result;
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT_EQUAL(i, q.element());
+        q.poll(result);
+    }
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an NoSuchElementException",
+        q.element(),
+        NoSuchElementException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testRemove() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT_EQUAL(i, q.remove());
+    }
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an NoSuchElementException",
+        q.remove(),
+        NoSuchElementException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testRemoveElement() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    for(int i = 1; i < SIZE; i += 2) {
+        CPPUNIT_ASSERT(q.remove(i));
+    }
+
+    for(int i = 0; i < SIZE; i += 2) {
+        CPPUNIT_ASSERT(q.remove(i));
+        CPPUNIT_ASSERT(!q.remove(i + 1));
+    }
+
+    CPPUNIT_ASSERT(q.isEmpty());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testRemoveElement2() {
+
+    LinkedBlockingQueue<int> q;
+    populate( q, SIZE );
+
+    CPPUNIT_ASSERT_MESSAGE( "Failed to remove valid Object", q.remove(42) );
+    CPPUNIT_ASSERT_MESSAGE( "Removed invalid object", !q.remove(999) );
+    CPPUNIT_ASSERT_EQUAL_MESSAGE( "Found Object after removal", false, q.contains(42) );
+    q.add(SIZE+1);
+    q.remove(SIZE+1);
+    CPPUNIT_ASSERT_MESSAGE( "Should not contain null afrer removal", !q.contains(SIZE+1)
);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testRemoveElementAndAdd() {
+
+    LinkedBlockingQueue<int> q;
+
+    CPPUNIT_ASSERT(q.add(1));
+    CPPUNIT_ASSERT(q.add(2));
+    CPPUNIT_ASSERT(q.remove(1));
+    CPPUNIT_ASSERT(q.remove(2));
+    CPPUNIT_ASSERT(q.add(3));
+    CPPUNIT_ASSERT(q.take() == 3);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testContains() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT(q.contains(i));
+        q.remove();
+        CPPUNIT_ASSERT(!q.contains(i));
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testClear() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    q.clear();
+    CPPUNIT_ASSERT(q.isEmpty());
+    CPPUNIT_ASSERT_EQUAL(0, q.size());
+    CPPUNIT_ASSERT_EQUAL(SIZE, q.remainingCapacity());
+    q.add(1);
+    CPPUNIT_ASSERT(!q.isEmpty());
+    CPPUNIT_ASSERT(q.contains(1));
+    q.clear();
+    CPPUNIT_ASSERT(q.isEmpty());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testContainsAll() {
+
+    LinkedBlockingQueue<int> q;
+    LinkedBlockingQueue<int> p;
+    populate(q, SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT(q.containsAll(p));
+        CPPUNIT_ASSERT(!p.containsAll(q));
+        p.add(i);
+    }
+    CPPUNIT_ASSERT(p.containsAll(q));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testRetainAll() {
+    LinkedBlockingQueue<int> q;
+    LinkedBlockingQueue<int> p;
+    populate(q, SIZE);
+    populate(p, SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        bool changed = q.retainAll(p);
+        if(i == 0) {
+            CPPUNIT_ASSERT(!changed);
+        } else {
+            CPPUNIT_ASSERT(changed);
+        }
+
+        CPPUNIT_ASSERT(q.containsAll(p));
+        CPPUNIT_ASSERT_EQUAL(SIZE-i, q.size());
+        p.remove();
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testRemoveAll() {
+
+    for (int i = 1; i < SIZE; ++i) {
+        LinkedBlockingQueue<int> q;
+        LinkedBlockingQueue<int> p;
+        populate(q, SIZE);
+        populate(p, i);
+
+        CPPUNIT_ASSERT(q.removeAll(p));
+        CPPUNIT_ASSERT_EQUAL(SIZE-i, q.size());
+        for (int j = 0; j < i; ++j) {
+            int result = p.remove();
+            CPPUNIT_ASSERT(!q.contains(result));
+        }
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testToArray() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    std::vector<int> array = q.toArray();
+    for(int i = 0; i < (int)array.size(); i++) {
+        CPPUNIT_ASSERT_EQUAL(array[i], q.take());
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testDrainToSelf() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an IllegalArgumentException",
+        q.drainTo(q),
+        IllegalArgumentException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testDrainTo() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+    LinkedList<int> list;
+
+    q.drainTo(list);
+
+    CPPUNIT_ASSERT_EQUAL(q.size(), 0);
+    CPPUNIT_ASSERT_EQUAL(list.size(), SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT_EQUAL(list.get(i), i);
+    }
+
+    q.add(0);
+    q.add(1);
+    CPPUNIT_ASSERT(!q.isEmpty());
+    CPPUNIT_ASSERT(q.contains(0));
+    CPPUNIT_ASSERT(q.contains(1));
+    list.clear();
+
+    q.drainTo(list);
+
+    CPPUNIT_ASSERT_EQUAL(q.size(), 0);
+    CPPUNIT_ASSERT_EQUAL(list.size(), 2);
+    for(int i = 0; i < 2; ++i) {
+        CPPUNIT_ASSERT_EQUAL(list.get(i), i);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testDrainToSelfN() {
+
+    LinkedBlockingQueue<int> q(SIZE);
+    populate(q, SIZE);
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should have thrown an IllegalArgumentException",
+        q.drainTo(q, SIZE),
+        IllegalArgumentException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testDrainToN() {
+
+    LinkedBlockingQueue<int> q;
+
+    for(int i = 0; i < SIZE + 2; ++i) {
+
+        for(int j = 0; j < SIZE; j++) {
+            CPPUNIT_ASSERT(q.offer(j));
+        }
+        LinkedList<int> list;
+        q.drainTo(list, i);
+        int k = (i < SIZE) ? i : SIZE;
+        CPPUNIT_ASSERT_EQUAL(list.size(), k);
+        CPPUNIT_ASSERT_EQUAL(q.size(), SIZE - k);
+        for(int j = 0; j < k; ++j) {
+            CPPUNIT_ASSERT_EQUAL(list.get(j), j);
+        }
+
+        int temp;
+        while(q.poll(temp) != false);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class PutThread : public Thread {
+    private:
+
+        LinkedBlockingQueue<int>* theQ;
+        int putValue;
+
+    public:
+
+        PutThread(LinkedBlockingQueue<int>* q, int putValue) : theQ(q), putValue(putValue)
{}
+
+        virtual void run() {
+            try {
+                theQ->put(putValue);
+            } catch(InterruptedException& ie){
+                // TODO deal with exceptions in threads.
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testDrainToWithActivePut() {
+
+    LinkedBlockingQueue<int> q;
+    populate(q, SIZE);
+
+    PutThread t(&q, SIZE+1);
+
+    t.start();
+
+    LinkedList<int> list;
+    q.drainTo(list);
+    CPPUNIT_ASSERT(list.size() >= SIZE);
+
+    for(int i = 0; i < SIZE; ++i) {
+        CPPUNIT_ASSERT_EQUAL(list.get(i), i);
+    }
+
+    t.join();
+
+    CPPUNIT_ASSERT(q.size() + list.size() >= SIZE);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testIterator() {
+
+    LinkedBlockingQueue<int> q;
+    populate(q, SIZE);
+
+    Pointer< Iterator<int> > iter(q.iterator());
+
+    while(iter->hasNext()) {
+        CPPUNIT_ASSERT_EQUAL(iter->next(), q.take());
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testIteratorRemove () {
+
+    LinkedBlockingQueue<int> q(3);
+
+    q.add(2);
+    q.add(1);
+    q.add(3);
+
+    Pointer< Iterator<int> > iter(q.iterator());
+    iter->next();
+    iter->remove();
+
+    iter.reset(q.iterator());
+
+    CPPUNIT_ASSERT_EQUAL(iter->next(), 1);
+    CPPUNIT_ASSERT_EQUAL(iter->next(), 3);
+    CPPUNIT_ASSERT(!iter->hasNext());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testIteratorOrdering() {
+
+    LinkedBlockingQueue<int> q(3);
+    q.add(1);
+    q.add(2);
+    q.add(3);
+
+    CPPUNIT_ASSERT_EQUAL(0, q.remainingCapacity());
+    int k = 0;
+
+    Pointer< Iterator<int> > iter(q.iterator());
+
+    while(iter->hasNext()) {
+        int i = iter->next();
+        CPPUNIT_ASSERT_EQUAL(++k, i);
+    }
+    CPPUNIT_ASSERT_EQUAL(3, k);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testWeaklyConsistentIteration () {
+
+    LinkedBlockingQueue<int> q(3);
+    q.add(1);
+    q.add(2);
+    q.add(3);
+
+    Pointer< Iterator<int> > iter(q.iterator());
+
+    while(iter->hasNext()) {
+        q.remove();
+        iter->next();
+    }
+
+    CPPUNIT_ASSERT_EQUAL(0, q.size());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class PuttingThread : public Thread {
+    private:
+
+        LinkedBlockingQueue<int>* theQ;
+        int count;
+
+    public:
+
+        PuttingThread(LinkedBlockingQueue<int>* q, int count) : theQ(q), count(count)
{}
+
+        virtual void run() {
+            try {
+                for(int i = 0; i < count; ++i) {
+                    theQ->put(i);
+                    Thread::sleep(1);
+                }
+            } catch(InterruptedException& ie){
+                // TODO deal with exceptions in threads.
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TakingThread : public Thread {
+    private:
+
+        LinkedBlockingQueue<int>* theQ;
+        int count;
+        LinkedList<int>* list;
+
+    public:
+
+        TakingThread(LinkedBlockingQueue<int>* q, LinkedList<int>* list, int
count) :
+            theQ(q), count(count), list(list) {}
+
+        virtual void run() {
+            try {
+                for(int i = 0; i < count; ++i) {
+                    list->add(theQ->take());
+                    Thread::sleep(1);
+                }
+            } catch(InterruptedException& ie){
+                // TODO deal with exceptions in threads.
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testConcurrentPut() {
+
+    {
+        LinkedBlockingQueue<int> q;
+        PuttingThread t(&q, SIZE);
+        LinkedList<int> list;
+
+        t.start();
+
+        for(int i = 0; i < SIZE; ++i) {
+            list.add(q.take());
+        }
+
+        t.join();
+
+        CPPUNIT_ASSERT(list.size() == SIZE);
+
+        for(int i = 0; i < SIZE; ++i) {
+            CPPUNIT_ASSERT_EQUAL(list.get(i), i);
+        }
+    }
+    {
+        LinkedBlockingQueue<int> q;
+        PuttingThread t1(&q, SIZE);
+        PuttingThread t2(&q, SIZE);
+        PuttingThread t3(&q, SIZE);
+        PuttingThread t4(&q, SIZE);
+        LinkedList<int> list;
+
+        t1.start();
+        t2.start();
+        t3.start();
+        t4.start();
+
+        for(int i = 0; i < SIZE*4; ++i) {
+            list.add(q.take());
+        }
+
+        t1.join();
+        t2.join();
+        t3.join();
+        t4.join();
+
+        CPPUNIT_ASSERT(list.size() == SIZE*4);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testConcurrentTake() {
+
+    {
+        LinkedBlockingQueue<int> q;
+        LinkedList<int> list;
+        TakingThread t(&q, &list, SIZE);
+
+        t.start();
+
+        for(int i = 0; i < SIZE; ++i) {
+            q.put(i);
+        }
+
+        t.join();
+
+        CPPUNIT_ASSERT(list.size() == SIZE);
+
+        for(int i = 0; i < SIZE; ++i) {
+            CPPUNIT_ASSERT_EQUAL(list.get(i), i);
+        }
+    }
+    {
+        LinkedBlockingQueue<int> q;
+        LinkedList<int> list1;
+        TakingThread t1(&q, &list1, SIZE);
+        LinkedList<int> list2;
+        TakingThread t2(&q, &list2, SIZE);
+        LinkedList<int> list3;
+        TakingThread t3(&q, &list3, SIZE);
+        LinkedList<int> list4;
+        TakingThread t4(&q, &list4, SIZE);
+
+        t1.start();
+        t2.start();
+        t3.start();
+        t4.start();
+
+        for(int i = 0; i < SIZE*4; ++i) {
+            q.put(i);
+        }
+
+        t1.join();
+        t2.join();
+        t3.join();
+        t4.join();
+
+        CPPUNIT_ASSERT(list1.size() == SIZE);
+        CPPUNIT_ASSERT(list2.size() == SIZE);
+        CPPUNIT_ASSERT(list3.size() == SIZE);
+        CPPUNIT_ASSERT(list4.size() == SIZE);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testConcurrentPutAndTake() {
+
+    {
+        const int SCOPED_SIZE = SIZE * 5;
+        LinkedBlockingQueue<int> q;
+        LinkedList<int> list;
+        PuttingThread p(&q, SCOPED_SIZE);
+        TakingThread t(&q, &list, SCOPED_SIZE);
+
+        t.start();
+        Thread::sleep(20);
+        p.start();
+
+        p.join();
+        t.join();
+
+        CPPUNIT_ASSERT(list.size() == SCOPED_SIZE);
+
+        for(int i = 0; i < SCOPED_SIZE; ++i) {
+            CPPUNIT_ASSERT_EQUAL(list.get(i), i);
+        }
+    }
+
+    {
+        LinkedBlockingQueue<int> q;
+        LinkedList<int> list1;
+        LinkedList<int> list2;
+        LinkedList<int> list3;
+        LinkedList<int> list4;
+        PuttingThread p1(&q, SIZE);
+        PuttingThread p2(&q, SIZE);
+        PuttingThread p3(&q, SIZE);
+        PuttingThread p4(&q, SIZE);
+        TakingThread t1(&q, &list1, SIZE);
+        TakingThread t2(&q, &list2, SIZE);
+        TakingThread t3(&q, &list3, SIZE);
+        TakingThread t4(&q, &list4, SIZE);
+
+        t1.start();
+        Thread::sleep(20);
+        p1.start();
+        t2.start();
+        Thread::sleep(20);
+        p2.start();
+        t3.start();
+        Thread::sleep(20);
+        p3.start();
+        t4.start();
+        Thread::sleep(20);
+        p4.start();
+
+        p1.join();
+        t1.join();
+        p2.join();
+        t2.join();
+        p3.join();
+        t3.join();
+        p4.join();
+        t4.join();
+
+        CPPUNIT_ASSERT(list1.size() == SIZE);
+        CPPUNIT_ASSERT(list2.size() == SIZE);
+        CPPUNIT_ASSERT(list3.size() == SIZE);
+        CPPUNIT_ASSERT(list4.size() == SIZE);
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h?rev=1086005&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h
Sun Mar 27 18:58:54 2011
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUETEST_H_
+#define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUETEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+    class LinkedBlockingQueueTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( LinkedBlockingQueueTest );
+        CPPUNIT_TEST( testConstructor1 );
+        CPPUNIT_TEST( testConstructor2 );
+        CPPUNIT_TEST( testConstructor3 );
+        CPPUNIT_TEST( testConstructor4 );
+        CPPUNIT_TEST( testEquals );
+        CPPUNIT_TEST( testEmptyFull );
+        CPPUNIT_TEST( testRemainingCapacity );
+        CPPUNIT_TEST( testOffer );
+        CPPUNIT_TEST( testAdd );
+        CPPUNIT_TEST( testAddAllSelf );
+        CPPUNIT_TEST( testAddAll1 );
+        CPPUNIT_TEST( testAddAll2 );
+        CPPUNIT_TEST( testPut );
+        CPPUNIT_TEST( testTake );
+        CPPUNIT_TEST( testPoll );
+        CPPUNIT_TEST( testTimedPoll1 );
+        CPPUNIT_TEST( testTimedPoll2 );
+        CPPUNIT_TEST( testPeek );
+        CPPUNIT_TEST( testElement );
+        CPPUNIT_TEST( testRemove );
+        CPPUNIT_TEST( testRemoveElement );
+        CPPUNIT_TEST( testRemoveElement2 );
+        CPPUNIT_TEST( testRemoveElementAndAdd );
+        CPPUNIT_TEST( testContains );
+        CPPUNIT_TEST( testClear );
+        CPPUNIT_TEST( testContainsAll );
+        CPPUNIT_TEST( testRetainAll );
+        CPPUNIT_TEST( testRemoveAll );
+        CPPUNIT_TEST( testToArray );
+        CPPUNIT_TEST( testDrainToSelf );
+        CPPUNIT_TEST( testDrainTo );
+        CPPUNIT_TEST( testDrainToSelfN );
+        CPPUNIT_TEST( testDrainToWithActivePut );
+        CPPUNIT_TEST( testDrainToN );
+        CPPUNIT_TEST( testIterator );
+        CPPUNIT_TEST( testIteratorRemove );
+        CPPUNIT_TEST( testIteratorOrdering );
+        CPPUNIT_TEST( testWeaklyConsistentIteration );
+        CPPUNIT_TEST( testConcurrentPut );
+        CPPUNIT_TEST( testConcurrentTake );
+        CPPUNIT_TEST( testConcurrentPutAndTake );
+        CPPUNIT_TEST_SUITE_END();
+
+    private:
+
+        static const int SIZE;
+
+    public:
+
+        LinkedBlockingQueueTest();
+        virtual ~LinkedBlockingQueueTest();
+
+        void testConstructor1();
+        void testConstructor2();
+        void testConstructor3();
+        void testConstructor4();
+        void testEquals();
+        void testEmptyFull();
+        void testRemainingCapacity();
+        void testOffer();
+        void testAdd();
+        void testAddAllSelf();
+        void testAddAll1();
+        void testAddAll2();
+        void testPut();
+        void testTake();
+        void testPoll();
+        void testTimedPoll1();
+        void testTimedPoll2();
+        void testPeek();
+        void testElement();
+        void testRemove();
+        void testRemoveElement();
+        void testRemoveElement2();
+        void testRemoveElementAndAdd();
+        void testContains();
+        void testClear();
+        void testContainsAll();
+        void testRetainAll();
+        void testRemoveAll();
+        void testToArray();
+        void testDrainToSelf();
+        void testDrainTo();
+        void testDrainToSelfN();
+        void testDrainToWithActivePut();
+        void testDrainToN();
+        void testIterator();
+        void testIteratorRemove();
+        void testIteratorOrdering();
+        void testWeaklyConsistentIteration();
+        void testConcurrentPut();
+        void testConcurrentTake();
+        void testConcurrentPutAndTake();
+
+    };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUETEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
(from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.cpp)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?p2=activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp&p1=activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.cpp&r1=1084577&r2=1086005&rev=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
Sun Mar 27 18:58:54 2011
@@ -15,138 +15,192 @@
  * limitations under the License.
  */
 
-#include "ThreadPoolTest.h"
+#include "ThreadPoolExecutorTest.h"
+
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+
+#include <decaf/lang/exceptions/RuntimeException.h>
 
 using namespace std;
 using namespace decaf;
 using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ///////////////////////////////////////////////////////////////////////////////
-void ThreadPoolTest::test1()
-{
-    CountDownLatch myLatch( 3 );
-    this->latch = &myLatch;
+namespace {
 
-    MyTask task1( 1 );
-    MyTask task2( 2 );
-    MyTask task3( 3 );
+    class MyTask : public lang::Runnable {
+    public:
+
+        CountDownLatch* latch;
+        int value;
+
+        MyTask(CountDownLatch* latch, int x) : Runnable(), latch(latch), value(x) {
+        }
 
-    this->complete = 0;
-    this->tasksToComplete = 3;
+        virtual ~MyTask() {}
+
+        virtual void run() {
+            value += 100;
+            Thread::sleep(10);
+            latch->countDown();
+        }
+    };
+
+    class MyExceptionTask : public lang::Runnable {
+    public:
+
+        int value;
+
+        MyExceptionTask() : Runnable() {
+        }
 
-    ThreadPool* pool = ThreadPool::getInstance();
+        virtual ~MyExceptionTask() {}
 
-    pool->setMaxThreads( ThreadPool::DEFAULT_MAX_POOL_SIZE );
-    pool->setBlockSize( ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+        virtual void run() {
+            throw RuntimeException();
+        }
+    };
+
+    class MyWaitingTask : public lang::Runnable {
+    public:
+
+        Mutex* mutex;
+        CountDownLatch* startedLatch;
+
+        MyWaitingTask( Mutex* mutex, CountDownLatch* startedLatch ) {
+            this->mutex = mutex;
+            this->startedLatch = startedLatch;
+        }
+
+        virtual ~MyWaitingTask() {};
+
+        virtual void run() {
+            try
+            {
+                synchronized(mutex) {
+                    startedLatch->countDown();
+                    mutex->wait();
+                }
+            }
+            catch( lang::Exception& ex ) {
+                ex.setMark( __FILE__, __LINE__ );
+            }
+        }
+    };
+
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutorTest::testSimpleTasks()
+{
+    CountDownLatch myLatch( 3 );
 
-    pool->queueTask( ThreadPool::Task( &task1, this ) );
-    pool->queueTask( ThreadPool::Task( &task2, this ) );
-    pool->queueTask( ThreadPool::Task( &task3, this ) );
+    MyTask task1(&myLatch, 1);
+    MyTask task2(&myLatch, 2);
+    MyTask task3(&myLatch, 3);
+
+    ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+
+    pool.execute(&task1);
+    pool.execute(&task2);
+    pool.execute(&task3);
 
     // Wait for them to finish, if we can't do this in 30 seconds then
     // there's probably something really wrong.
-    myLatch.await( 30000 );
-
-    CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
+    CPPUNIT_ASSERT( myLatch.await( 30000 ) );
 
     CPPUNIT_ASSERT( task1.value == 101 );
     CPPUNIT_ASSERT( task2.value == 102 );
     CPPUNIT_ASSERT( task3.value == 103 );
 
-    CPPUNIT_ASSERT( pool->getPoolSize() > 0 );
-    CPPUNIT_ASSERT( pool->getBacklog() == 0 );
+    CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
 
-    CPPUNIT_ASSERT( pool->getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
-    CPPUNIT_ASSERT( pool->getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+    pool.shutdown();
+}
 
-    pool->setMaxThreads(50);
-    pool->setBlockSize(50);
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutorTest::testMoreTasksThanMaxPoolSize() {
 
-    CPPUNIT_ASSERT( pool->getMaxThreads() == 50 );
-    CPPUNIT_ASSERT( pool->getBlockSize() == 50 );
+    ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+    Mutex myMutex;
 
-    // Give it a little time to create all those threads.
-    for( int i = 0; i < 1000; ++i ) {
-        if( pool->getFreeThreadCount() == pool->getPoolSize() ) {
-            break;
-        }
+    CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3);
+
+    CountDownLatch startedLatch1(3);  // First three should go right away
+    CountDownLatch startedLatch2(1);  // The fourth one goes after others finish
+
+    MyWaitingTask task1( &myMutex, &startedLatch1 );
+    MyWaitingTask task2( &myMutex, &startedLatch1 );
+    MyWaitingTask task3( &myMutex, &startedLatch1 );
+    MyWaitingTask task4( &myMutex, &startedLatch2 );
+
+    pool.execute( &task1 );
+    pool.execute( &task2 );
+    pool.execute( &task3 );
+    pool.execute( &task4 );
+
+    // Wait 30 seconds, then we let it fail because something is
+    // probably very wrong.
+    CPPUNIT_ASSERT( startedLatch1.await( 30000 ) );
+
+    Thread::sleep(10);
+
+    // Wake up the tasks.
+    synchronized(&myMutex) {
+        myMutex.notifyAll();
+    }
 
-        Thread::sleep( 100 );
+    // Wait 30 seconds, then we let it fail because something is
+    // probably very wrong.
+    CPPUNIT_ASSERT( startedLatch2.await( 30000 ) );
+
+    Thread::sleep(10);
+
+    // Wake up the last task.
+    synchronized(&myMutex) {
+        myMutex.notifyAll();
     }
 
-    CPPUNIT_ASSERT( pool->getFreeThreadCount() == pool->getPoolSize() );
-    CPPUNIT_ASSERT( this->caughtEx == false );
+    // Wait for them to finish, if it takes longer than 30 seconds
+    // something is not right.
+    CPPUNIT_ASSERT( startedLatch2.await( 30000 ) );
 }
 
 ///////////////////////////////////////////////////////////////////////////////
-void ThreadPoolTest::test2() {
+void ThreadPoolExecutorTest::testTasksThatThrow()
+{
+    CountDownLatch myLatch( 3 );
 
-    try
-    {
-        ThreadPool pool;
-        Mutex myMutex;
-
-        CPPUNIT_ASSERT( pool.getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
-        CPPUNIT_ASSERT( pool.getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
-        pool.setMaxThreads(3);
-        pool.setBlockSize(1);
-        CPPUNIT_ASSERT( pool.getMaxThreads() == 3 );
-        CPPUNIT_ASSERT( pool.getBlockSize() == 1 );
-        CPPUNIT_ASSERT( pool.getPoolSize() == 0 );
-        pool.reserve( 4 );
-        CPPUNIT_ASSERT( pool.getPoolSize() == 3 );
-        CPPUNIT_ASSERT( pool.getFreeThreadCount() == 3 );
-
-        CountDownLatch startedLatch1(3);  // First three should go right away
-        CountDownLatch startedLatch2(1);  // The fourth one goes after others finish
-        CountDownLatch doneLatch(4);      // All should be done when we are at the end.
-
-        this->latch = &doneLatch;
-
-        MyWaitingTask task1( &myMutex, &startedLatch1 );
-        MyWaitingTask task2( &myMutex, &startedLatch1 );
-        MyWaitingTask task3( &myMutex, &startedLatch1 );
-        MyWaitingTask task4( &myMutex, &startedLatch2 );
-
-        this->complete = 0;
-        this->tasksToComplete = 4;
-
-        pool.queueTask( ThreadPool::Task( &task1, this ) );
-        pool.queueTask( ThreadPool::Task( &task2, this ) );
-        pool.queueTask( ThreadPool::Task( &task3, this ) );
-        pool.queueTask( ThreadPool::Task( &task4, this ) );
-
-        // Wait 30 seconds, then we let it fail because something is
-        // probably very wrong.
-        startedLatch1.await( 30000 );
-
-        CPPUNIT_ASSERT( pool.getFreeThreadCount() == 0 );
-        CPPUNIT_ASSERT( pool.getBacklog() == 1 );
-
-        // Wake up the tasks.
-        synchronized(&myMutex) {
-            myMutex.notifyAll();
-        }
-
-        // Wait 30 seconds, then we let it fail because something is
-        // probably very wrong.
-        startedLatch2.await( 30000 );
-
-        // Wake up the last task.
-        synchronized(&myMutex) {
-            myMutex.notifyAll();
-        }
-
-        // Wait for them to finish, if it takes longer than 30 seconds
-        // something is not right.
-        doneLatch.await( 30000 );
+    MyTask task1(&myLatch, 1);
+    MyTask task2(&myLatch, 2);
+    MyTask task3(&myLatch, 3);
+
+    MyExceptionTask exTask1;
+    MyExceptionTask exTask2;
+    MyExceptionTask exTask3;
+
+    ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+
+    pool.execute(&exTask1);
+    pool.execute(&task2);
+    pool.execute(&exTask2);
+    pool.execute(&task1);
+    pool.execute(&exTask3);
+    pool.execute(&task3);
 
-        CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
-        CPPUNIT_ASSERT( this->caughtEx == false );
-    }
-    catch( lang::Exception& ex ) {
-        ex.setMark( __FILE__, __LINE__ );
-    }
+    // Wait for them to finish, if we can't do this in 30 seconds then
+    // there's probably something really wrong.
+    CPPUNIT_ASSERT( myLatch.await( 30000 ) );
+
+    CPPUNIT_ASSERT( task1.value == 101 );
+    CPPUNIT_ASSERT( task2.value == 102 );
+    CPPUNIT_ASSERT( task3.value == 103 );
+
+    CPPUNIT_ASSERT( pool.getMaximumPoolSize() == 3 );
+
+    pool.shutdown();
 }

Copied: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
(from r1084577, activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?p2=activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h&p1=activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.h&r1=1084577&r2=1086005&rev=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
Sun Mar 27 18:58:54 2011
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-#ifndef _DECAF_UTIL_CONCURRENT_THREADPOOLTEST_H_
-#define _DECAF_UTIL_CONCURRENT_THREADPOOLTEST_H_
+#ifndef _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTORTEST_H_
+#define _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTORTEST_H_
 
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
@@ -24,8 +24,7 @@
 #include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/concurrent/Concurrent.h>
 #include <decaf/lang/Thread.h>
-#include <decaf/util/concurrent/ThreadPool.h>
-#include <decaf/util/concurrent/TaskListener.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/Config.h>
 
@@ -33,109 +32,25 @@ namespace decaf{
 namespace util{
 namespace concurrent{
 
-    class ThreadPoolTest :
-        public CppUnit::TestFixture,
-        public TaskListener
-    {
-        CPPUNIT_TEST_SUITE( ThreadPoolTest );
-        CPPUNIT_TEST( test1 );
-        CPPUNIT_TEST( test2 );
-        CPPUNIT_TEST_SUITE_END();
-
-        int tasksToComplete;
-        int complete;
-        bool caughtEx;
-        CountDownLatch* latch;
-
-    public:
+    class ThreadPoolExecutorTest : public CppUnit::TestFixture {
 
-        ThreadPoolTest() {
-            complete = 0;
-            tasksToComplete = 0;
-            caughtEx = false;
-            latch = NULL;
-        }
-
-        virtual ~ThreadPoolTest() {}
-
-        virtual void onTaskComplete( lang::Runnable* task DECAF_UNUSED)
-        {
-            try{
-
-                complete++;
-
-                if( latch != NULL ) {
-                    latch->countDown();
-                }
-            }catch( lang::Exception& ex ){
-                ex.setMark( __FILE__, __LINE__ );
-            }
-        }
-
-        virtual void onTaskException(
-            lang::Runnable* task DECAF_UNUSED,
-            lang::Exception& ex DECAF_UNUSED) {
-            caughtEx = true;
-        }
-
-    public:
-
-        class MyTask : public lang::Runnable
-        {
-        public:
-
-            int value;
-
-            MyTask( int x ) {
-                value = x;
-            }
-
-            virtual ~MyTask() {};
-
-            virtual void run() {
-                value += 100;
-            }
-        };
-
-        class MyWaitingTask : public lang::Runnable
-        {
-        public:
-
-            Mutex* mutex;
-            CountDownLatch* startedLatch;
-
-            MyWaitingTask( Mutex* mutex, CountDownLatch* startedLatch ) {
-                this->mutex = mutex;
-                this->startedLatch = startedLatch;
-            }
-
-            virtual ~MyWaitingTask() {};
-
-            virtual void run() {
-                try
-                {
-                    synchronized(mutex) {
-                        startedLatch->countDown();
-                        mutex->wait();
-                    }
-                }
-                catch( lang::Exception& ex ) {
-                    ex.setMark( __FILE__, __LINE__ );
-                }
-            }
-        };
+        CPPUNIT_TEST_SUITE( ThreadPoolExecutorTest );
+        CPPUNIT_TEST( testSimpleTasks );
+        CPPUNIT_TEST( testMoreTasksThanMaxPoolSize );
+        CPPUNIT_TEST( testTasksThatThrow );
+        CPPUNIT_TEST_SUITE_END();
 
     public:
 
-        virtual void setUp() {
-            this->latch = NULL;
-        }
+        ThreadPoolExecutorTest() {}
+        virtual ~ThreadPoolExecutorTest() {}
 
-        virtual void test1();
-        virtual void test2();
+        void testSimpleTasks();
+        void testMoreTasksThanMaxPoolSize();
+        void testTasksThatThrow();
 
     };
 
 }}}
 
-#endif /*_DECAF_UTIL_CONCURRENT_THREADPOOLTEST_H_*/
+#endif /*_DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTORTEST_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1086005&r1=1086004&r2=1086005&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Sun Mar 27 18:58:54
2011
@@ -273,10 +273,12 @@ CPPUNIT_TEST_SUITE_REGISTRATION( decaf::
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::CountDownLatchTest );
 #include <decaf/util/concurrent/MutexTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::MutexTest );
-#include <decaf/util/concurrent/ThreadPoolTest.h>
-CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::ThreadPoolTest );
+#include <decaf/util/concurrent/ThreadPoolExecutorTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::ThreadPoolExecutorTest );
 #include <decaf/util/concurrent/TimeUnitTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::TimeUnitTest );
+#include <decaf/util/concurrent/LinkedBlockingQueueTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::LinkedBlockingQueueTest );
 
 #include <decaf/util/concurrent/atomic/AtomicBooleanTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::atomic::AtomicBooleanTest );



Mime
View raw message