activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1336955 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/util/concurrent/ test/decaf/util/concurrent/
Date Thu, 10 May 2012 23:31:56 GMT
Author: tabish
Date: Thu May 10 23:31:56 2012
New Revision: 1336955

URL: http://svn.apache.org/viewvc?rev=1336955&view=rev
Log:
Refactor the LinkedBlockingQueue to handle thread interrupt and fix the purge method to prevent
stack overflow errors. 

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h?rev=1336955&r1=1336954&r2=1336955&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/LinkedBlockingQueue.h
Thu May 10 23:31:56 2012
@@ -21,9 +21,8 @@
 #include <decaf/util/Config.h>
 
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/Lock.h>
 #include <decaf/util/concurrent/BlockingQueue.h>
+#include <decaf/util/concurrent/locks/ReentrantLock.h>
 #include <decaf/util/AbstractQueue.h>
 #include <decaf/util/Iterator.h>
 #include <decaf/lang/Integer.h>
@@ -137,8 +136,17 @@ namespace concurrent {
         int capacity;
         decaf::util::concurrent::atomic::AtomicInteger count;
 
-        mutable decaf::util::concurrent::Mutex putLock;
-        mutable decaf::util::concurrent::Mutex takeLock;
+        /** Lock held by take, poll, etc */
+        mutable locks::ReentrantLock takeLock;
+
+        /** Wait queue for waiting takes */
+        Pointer<locks::Condition> notEmpty;  // takeLock.newCondition();
+
+        /** Lock held by put, offer, etc */
+        mutable locks::ReentrantLock putLock;
+
+        /** Wait queue for waiting puts */
+        Pointer<locks::Condition> notFull; // putLock.newCondition();
 
         Pointer< QueueNode<E> > head;
         Pointer< QueueNode<E> > tail;
@@ -149,9 +157,11 @@ namespace concurrent {
          * Create a new instance with a Capacity of Integer::MAX_VALUE
          */
         LinkedBlockingQueue() : BlockingQueue<E>(), capacity(lang::Integer::MAX_VALUE),
count(),
-                                putLock(), takeLock(), head(new QueueNode<E>()), tail()
{
+                                takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()),
tail() {
 
             this->tail = this->head;
+            this->notEmpty.reset(this->takeLock.newCondition());
+            this->notFull.reset(this->putLock.newCondition());
         }
 
         /**
@@ -163,13 +173,15 @@ namespace concurrent {
          * @throws IllegalArgumentException if the specified capacity is not greater than
zero.
          */
         LinkedBlockingQueue(int capacity) : BlockingQueue<E>(), capacity(capacity),
count(),
-                                            putLock(), takeLock(), head(new QueueNode<E>()),
tail() {
+                                            takeLock(), notEmpty(), putLock(), notFull(),
head(new QueueNode<E>()), tail() {
             if(capacity <= 0) {
                 throw decaf::lang::exceptions::IllegalArgumentException(
                     __FILE__, __LINE__, "Capacity value must be greater than zero.");
             }
 
             this->tail = this->head;
+            this->notEmpty.reset(this->takeLock.newCondition());
+            this->notFull.reset(this->putLock.newCondition());
         }
 
         /**
@@ -183,11 +195,13 @@ namespace concurrent {
          *         this Queue's capacity.
          */
         LinkedBlockingQueue(const Collection<E>& collection) : BlockingQueue<E>(),
-                                                               capacity(lang::Integer::MAX_VALUE),
-                                                               count(), putLock(), takeLock(),
+                                                               capacity(lang::Integer::MAX_VALUE),
count(),
+                                                               takeLock(), notEmpty(), putLock(),
notFull(),
                                                                head(new QueueNode<E>()),
tail() {
 
             this->tail = this->head;
+            this->notEmpty.reset(this->takeLock.newCondition());
+            this->notFull.reset(this->putLock.newCondition());
 
             Pointer< Iterator<E> > iter(collection.iterator());
 
@@ -247,7 +261,7 @@ namespace concurrent {
             this->count.set(0);
 
             if(this->count.getAndSet(0) == this->capacity) {
-                this->putLock.notify();
+                this->notFull->signal();
             }
         }
 
@@ -259,10 +273,16 @@ namespace concurrent {
 
             int c = -1;
 
-            synchronized(&this->putLock) {
+            this->putLock.lockInterruptibly();
+            try {
 
-                while(this->count.get() == this->capacity) {
-                    this->putLock.wait();
+                // Note that count is used in wait guard even though it is not
+                // protected by lock. This works because count can only decrease at
+                // this point (all other puts are shut  out by lock), and we (or some
+                // other waiting put) are signaled if it ever changes from capacity.
+                // Similarly for all other uses of count in other wait guards.
+                while (this->count.get() == this->capacity) {
+                    this->notFull->await();
                 }
 
                 // This method now owns the putLock so we know we have at least
@@ -273,14 +293,19 @@ namespace concurrent {
                 c = this->count.getAndIncrement();
 
                 if(c + 1 < this->capacity) {
-                    this->putLock.notify();
+                    this->notFull->signal();
                 }
+            } catch(decaf::lang::Exception& ex) {
+                this->putLock.unlock();
+                throw;
             }
 
+            this->putLock.unlock();
+
             // When c is zero it means we at least incremented once so there was
             // something in the Queue, another take could have already happened but
             // we don't know so wake up a waiting taker.
-            if(c == 0) {
+            if (c == 0) {
                 this->signalNotEmpty();
             }
         }
@@ -288,21 +313,33 @@ namespace concurrent {
         virtual bool offer( const E& value, long long timeout, const TimeUnit& unit
) {
 
             int c = -1;
+            long long nanos = unit.toNanos(timeout);
 
-            synchronized(&this->putLock) {
+            this->putLock.lockInterruptibly();
+            try {
 
                 while(this->count.get() == this->capacity) {
-                    this->putLock.wait(unit.toMillis(timeout));
+                    if (nanos <= 0) {
+                        return false;
+                    }
+
+                    nanos = this->notFull->awaitNanos(nanos);
                 }
 
                 enqueue(value);
                 c = this->count.getAndIncrement();
 
                 if(c + 1 < this->capacity) {
-                    this->putLock.notify();
+                    this->notFull->signal();
                 }
+
+            } catch(decaf::lang::Exception& ex) {
+                this->putLock.unlock();
+                throw;
             }
 
+            this->putLock.unlock();
+
             if(c == 0) {
                 this->signalNotEmpty();
             }
@@ -310,26 +347,34 @@ namespace concurrent {
             return true;
         }
 
-        virtual bool offer( const E& value ) {
+        virtual bool offer(const E& value) {
 
-            if(this->count.get() == this->capacity) {
+            if (this->count.get() == this->capacity) {
                 return false;
             }
 
             int c = -1;
-            synchronized(&this->putLock) {
-                if(this->count.get() < this->capacity) {
+            this->putLock.lockInterruptibly();
+            try {
+
+                if (this->count.get() < this->capacity) {
 
                     enqueue(value);
                     c = this->count.getAndIncrement();
 
-                    if(c + 1 < this->capacity) {
-                        this->putLock.notify();
+                    if (c + 1 < this->capacity) {
+                        this->notFull->signal();
                     }
                 }
+
+            } catch (decaf::lang::Exception& ex) {
+                this->putLock.unlock();
+                throw;
             }
 
-            if(c == 0) {
+            this->putLock.unlock();
+
+            if (c == 0) {
                 this->signalNotEmpty();
             }
 
@@ -337,12 +382,15 @@ namespace concurrent {
         }
 
         virtual E take() {
+
             E value = E();
             int c = -1;
-            synchronized(&this->takeLock) {
 
-                while(this->count.get() == 0) {
-                     this->takeLock.wait();
+            this->takeLock.lockInterruptibly();
+            try {
+
+                while (this->count.get() == 0) {
+                     this->notEmpty->await();
                 }
 
                 // Since this methods owns the takeLock and count != 0 we know that
@@ -351,42 +399,56 @@ namespace concurrent {
                 value = dequeue();
                 c = this->count.getAndDecrement();
 
-                if(c > 1) {
-                    this->takeLock.notify();
+                if (c > 1) {
+                    this->notEmpty->signal();
                 }
+
+            } catch (decaf::lang::Exception& ex) {
+                this->takeLock.unlock();
+                throw;
             }
 
+            this->takeLock.unlock();
+
             // When c equals capacity we have removed at least one element
             // from the Queue so we wake a blocked put operation if there is
             // one to prevent a deadlock.
-            if(c == this->capacity) {
+            if (c == this->capacity) {
                 this->signalNotFull();
             }
 
             return value;
         }
 
-        virtual bool poll( E& result, long long timeout, const TimeUnit& unit ) {
+        virtual bool poll(E& result, long long timeout, const TimeUnit& unit) {
             int c = -1;
-            synchronized(&this->takeLock) {
-                if(this->count.get() == 0) {
-                    if(timeout <= 0) {
-                        return false;
-                    }
-                    this->takeLock.wait(unit.toMillis(timeout));
-                    if(this->count.get() == 0) {
+            long long nanos = unit.toNanos(timeout);
+
+            this->takeLock.lockInterruptibly();
+            try {
+
+                while (this->count.get() == 0) {
+                    if (nanos <= 0) {
                         return false;
                     }
+
+                    nanos = this->notEmpty->awaitNanos(nanos);
                 }
 
                 result = dequeue();
                 c = this->count.getAndDecrement();
 
-                if(c > 1) {
-                    this->takeLock.notify();
+                if (c > 1) {
+                    this->notEmpty->signal();
                 }
+
+            } catch (decaf::lang::Exception& ex) {
+                this->takeLock.unlock();
+                throw;
             }
 
+            this->takeLock.unlock();
+
             if(c == this->capacity) {
                 this->signalNotFull();
             }
@@ -396,24 +458,31 @@ namespace concurrent {
 
         virtual bool poll(E& result) {
 
-            if(this->count.get() == 0) {
+            if (this->count.get() == 0) {
                 return false;
             }
 
             int c = -1;
-            synchronized(&this->takeLock) {
+            this->takeLock.lock();
+            try {
 
-                if(this->count.get() > 0) {
+                if (this->count.get() > 0) {
                     result = dequeue();
                     c = this->count.getAndDecrement();
 
-                    if(c > 1) {
-                        this->takeLock.notify();
+                    if (c > 1) {
+                        this->notEmpty->signal();
                     }
                 }
+
+            } catch (decaf::lang::Exception& ex) {
+                this->takeLock.unlock();
+                throw;
             }
 
-            if(c == this->capacity) {
+            this->takeLock.unlock();
+
+            if (c == this->capacity) {
                 this->signalNotFull();
             }
 
@@ -426,15 +495,21 @@ namespace concurrent {
                 return false;
             }
 
-            synchronized(&this->takeLock) {
+            this->takeLock.lock();
+            try {
                 Pointer< QueueNode<E> > front = this->head->next;
                 if(front == NULL) {
                     return false;
                 } else {
                     result = front->get();
                 }
+            } catch (decaf::lang::Exception& ex) {
+                this->takeLock.unlock();
+                throw;
             }
 
+            this->takeLock.unlock();
+
             return true;
         }
 
@@ -492,7 +567,8 @@ namespace concurrent {
             decaf::lang::Exception delayed;
             int result = 0;
 
-            synchronized(&this->takeLock) {
+            this->takeLock.lock();
+            try {
 
                 // We get the count of Nodes that exist now, any puts that are done
                 // after this are not drained and since we hold the lock nothing can
@@ -514,17 +590,23 @@ namespace concurrent {
                     shouldThrow = true;
                 }
 
-                if(i > 0) {
+                if (i > 0) {
                     this->head = node;
                     signalNotFull = (this->count.getAndAdd(-i) == this->capacity);
                 }
+
+            } catch(decaf::lang::Exception& ex) {
+                this->takeLock.unlock();
+                throw;
             }
 
-            if(signalNotFull) {
+            this->takeLock.unlock();
+
+            if (signalNotFull) {
                 this->signalNotFull();
             }
 
-            if(shouldThrow) {
+            if (shouldThrow) {
                 throw delayed;
             }
 
@@ -726,15 +808,25 @@ namespace concurrent {
         }
 
         void signalNotEmpty() {
-            synchronized(&this->takeLock) {
-                this->takeLock.notify();
+            this->takeLock.lock();
+            try {
+                this->notEmpty->signal();
+            } catch(decaf::lang::Exception& ex) {
+                this->takeLock.unlock();
+                throw;
             }
+            this->takeLock.unlock();
         }
 
         void signalNotFull() {
-            synchronized(&this->putLock) {
-                this->putLock.notify();
+            this->putLock.lock();
+            try {
+                this->notFull->signal();
+            } catch(decaf::lang::Exception& ex) {
+                this->putLock.unlock();
+                throw;
             }
+            this->putLock.unlock();
         }
 
         // Must be called with the putLock locked.
@@ -759,6 +851,7 @@ namespace concurrent {
             while(current != NULL) {
                 temp = current;
                 current = current->next;
+                temp->next.reset(NULL);
                 temp.reset(NULL);
             }
         }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.cpp?rev=1336955&r1=1336954&r2=1336955&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.cpp
Thu May 10 23:31:56 2012
@@ -56,6 +56,11 @@ void ExecutorsTestSupport::unexpectedExc
 }
 
 ///////////////////////////////////////////////////////////////////////////////
+void ExecutorsTestSupport::unexpectedException(Throwable& ex) {
+    CPPUNIT_FAIL(std::string("Unexpected exception: ") + ex.getMessage());
+}
+
+///////////////////////////////////////////////////////////////////////////////
 void ExecutorsTestSupport::threadFail(const std::string& reason) {
     threadFailed = true;
     CPPUNIT_FAIL(reason);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h?rev=1336955&r1=1336954&r2=1336955&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTestSupport.h
Thu May 10 23:31:56 2012
@@ -71,6 +71,7 @@ namespace concurrent {
         void threadAssertEquals(long long x, long long y);
 
         void unexpectedException();
+        void unexpectedException(decaf::lang::Throwable& ex);
         void shouldThrow();
 
         void joinPool(ExecutorService* exec);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp?rev=1336955&r1=1336954&r2=1336955&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.cpp
Thu May 10 23:31:56 2012
@@ -723,6 +723,8 @@ namespace {
 
         PuttingThread(LinkedBlockingQueue<int>* q, int count) : theQ(q), count(count)
{}
 
+        virtual ~PuttingThread() {}
+
         virtual void run() {
             try {
                 for(int i = 0; i < count; ++i) {
@@ -933,3 +935,289 @@ void LinkedBlockingQueueTest::testConcur
         CPPUNIT_ASSERT(list4.size() == SIZE);
     }
 }
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestBlockingPutRunnable : public Runnable {
+    private:
+
+        LinkedBlockingQueueTest* test;
+
+    public:
+
+        TestBlockingPutRunnable(LinkedBlockingQueueTest* test) : Runnable(), test(test) {
+
+        }
+
+        virtual ~TestBlockingPutRunnable() {}
+
+        virtual void run() {
+            int added = 0;
+            try {
+                LinkedBlockingQueue<int> q(LinkedBlockingQueueTest::SIZE);
+                for (int i = 0; i < LinkedBlockingQueueTest::SIZE; ++i) {
+                    q.put(i);
+                    ++added;
+                }
+                q.put(LinkedBlockingQueueTest::SIZE);
+                test->threadShouldThrow();
+            } catch (InterruptedException& ie){
+                test->threadAssertEquals(added, LinkedBlockingQueueTest::SIZE);
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testBlockingPut() {
+    TestBlockingPutRunnable runnable(this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch (InterruptedException& ie) {
+        unexpectedException();
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTimedOfferRunnable : public Runnable {
+    private:
+
+        LinkedBlockingQueue<int>* queue;
+        LinkedBlockingQueueTest* test;
+
+    public:
+
+        TestTimedOfferRunnable(LinkedBlockingQueue<int>* queue, LinkedBlockingQueueTest*
test) :
+            Runnable(), queue(queue), test(test) {
+        }
+
+        virtual ~TestTimedOfferRunnable() {}
+
+        virtual void run() {
+            try {
+                queue->put(1);
+                queue->put(2);
+                test->threadAssertFalse(
+                    queue->offer(3, LinkedBlockingQueueTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                queue->offer(4, LinkedBlockingQueueTest::LONG_DELAY_MS, TimeUnit::MILLISECONDS);
+                test->threadShouldThrow();
+            } catch (InterruptedException& success) {
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testTimedOffer() {
+
+    LinkedBlockingQueue<int> q(2);
+    TestTimedOfferRunnable runnable(&q, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SMALL_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch (Exception& ex) {
+        unexpectedException(ex);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTakeFromEmptyRunnable : public Runnable {
+    private:
+
+        LinkedBlockingQueue<int>* queue;
+        LinkedBlockingQueueTest* test;
+
+    public:
+
+        TestTakeFromEmptyRunnable(LinkedBlockingQueue<int>* queue, LinkedBlockingQueueTest*
test) :
+            Runnable(), queue(queue), test(test) {
+        }
+
+        virtual ~TestTakeFromEmptyRunnable() {}
+
+        virtual void run() {
+            try {
+                queue->take();
+                test->threadShouldThrow();
+            } catch (InterruptedException& success) {
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testTakeFromEmpty() {
+
+    LinkedBlockingQueue<int> q(2);
+    TestTakeFromEmptyRunnable runnable(&q, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch (Exception& e){
+        unexpectedException();
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestBlockingTakeRunnable : public Runnable {
+    private:
+
+        LinkedBlockingQueueTest* test;
+
+    public:
+
+        TestBlockingTakeRunnable(LinkedBlockingQueueTest* test) :
+            Runnable(), test(test) {
+        }
+
+        virtual ~TestBlockingTakeRunnable() {}
+
+        virtual void run() {
+            try {
+                LinkedBlockingQueue<int> queue;
+                populate(queue, LinkedBlockingQueueTest::SIZE);
+
+                for (int i = 0; i < LinkedBlockingQueueTest::SIZE; ++i) {
+                    test->threadAssertEquals(i, queue.take());
+                }
+                queue.take();
+                test->threadShouldThrow();
+            } catch (InterruptedException& success) {
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testBlockingTake() {
+
+    TestBlockingTakeRunnable runnable(this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch (InterruptedException& ie) {
+        unexpectedException(ie);
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestInterruptedTimedPollRunnable : public Runnable {
+    private:
+
+        LinkedBlockingQueueTest* test;
+
+    public:
+
+        TestInterruptedTimedPollRunnable(LinkedBlockingQueueTest* test) :
+            Runnable(), test(test) {
+        }
+
+        virtual ~TestInterruptedTimedPollRunnable() {}
+
+        virtual void run() {
+            try {
+                LinkedBlockingQueue<int> queue;
+                populate(queue, LinkedBlockingQueueTest::SIZE);
+
+                for (int i = 0; i < LinkedBlockingQueueTest::SIZE; ++i) {
+                    test->threadAssertEquals(i, queue.take());
+                }
+                queue.take();
+                int result = 0;
+                test->threadAssertFalse(queue.poll(result, LinkedBlockingQueueTest::SHORT_DELAY_MS,
TimeUnit::MILLISECONDS));
+                test->threadShouldThrow();
+            } catch (InterruptedException& success) {
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testInterruptedTimedPoll() {
+
+    TestInterruptedTimedPollRunnable runnable(this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch (InterruptedException& ie) {
+        unexpectedException();
+    }
+}
+
+///////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTimedPollWithOfferRunnable : public Runnable {
+    private:
+
+        LinkedBlockingQueue<int>* queue;
+        LinkedBlockingQueueTest* test;
+
+    public:
+
+        TestTimedPollWithOfferRunnable(LinkedBlockingQueue<int>* queue, LinkedBlockingQueueTest*
test) :
+            Runnable(), queue(queue), test(test) {
+        }
+
+        virtual ~TestTimedPollWithOfferRunnable() {}
+
+        virtual void run() {
+            try {
+                int result = 0;
+                test->threadAssertFalse(queue->poll(result, LinkedBlockingQueueTest::SHORT_DELAY_MS,
TimeUnit::MILLISECONDS));
+                test->threadAssertTrue(queue->poll(result, LinkedBlockingQueueTest::LONG_DELAY_MS,
TimeUnit::MILLISECONDS));
+                queue->poll(result, LinkedBlockingQueueTest::LONG_DELAY_MS, TimeUnit::MILLISECONDS);
+                test->threadShouldThrow();
+            } catch (InterruptedException& success) {
+            }
+        }
+    };
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void LinkedBlockingQueueTest::testTimedPollWithOffer() {
+
+    LinkedBlockingQueue<int> q(2);
+    TestTimedPollWithOfferRunnable runnable(&q, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SMALL_DELAY_MS);
+        CPPUNIT_ASSERT(q.offer(0, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        t.interrupt();
+        t.join();
+    } catch (Exception& e) {
+        unexpectedException(e);
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h?rev=1336955&r1=1336954&r2=1336955&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/LinkedBlockingQueueTest.h
Thu May 10 23:31:56 2012
@@ -20,12 +20,13 @@
 
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
+#include <decaf/util/concurrent/ExecutorsTestSupport.h>
 
 namespace decaf {
 namespace util {
 namespace concurrent {
 
-    class LinkedBlockingQueueTest : public CppUnit::TestFixture {
+    class LinkedBlockingQueueTest : public ExecutorsTestSupport {
 
         CPPUNIT_TEST_SUITE( LinkedBlockingQueueTest );
         CPPUNIT_TEST( testConstructor1 );
@@ -69,9 +70,15 @@ namespace concurrent {
         CPPUNIT_TEST( testConcurrentPut );
         CPPUNIT_TEST( testConcurrentTake );
         CPPUNIT_TEST( testConcurrentPutAndTake );
+        CPPUNIT_TEST( testBlockingPut );
+        CPPUNIT_TEST( testTimedOffer );
+        CPPUNIT_TEST( testTakeFromEmpty );
+        CPPUNIT_TEST( testBlockingTake );
+        CPPUNIT_TEST( testInterruptedTimedPoll );
+        CPPUNIT_TEST( testTimedPollWithOffer );
         CPPUNIT_TEST_SUITE_END();
 
-    private:
+    public:
 
         static const int SIZE;
 
@@ -121,6 +128,12 @@ namespace concurrent {
         void testConcurrentPut();
         void testConcurrentTake();
         void testConcurrentPutAndTake();
+        void testBlockingPut();
+        void testTimedOffer();
+        void testTakeFromEmpty();
+        void testBlockingTake();
+        void testInterruptedTimedPoll();
+        void testTimedPollWithOffer();
 
     };
 



Mime
View raw message