activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1128104 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/internal/util/concurrent/ main/decaf/util/concurrent/ main/decaf/util/concurrent/locks/ test/ test/decaf/lang/ test/decaf/util/concurrent/
Date Thu, 26 May 2011 22:14:58 GMT
Author: tabish
Date: Thu May 26 22:14:57 2011
New Revision: 1128104

URL: http://svn.apache.org/viewvc?rev=1128104&view=rev
Log:
Fixes for the thread queue synchronizer, adds Semaphore implementation that uses the AQS and moves CountdownLatch to it as well.  Adds new tests for CountdownLatch and a suite for the Semaphore class.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Atomics.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/ArrayPointerTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Atomics.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Atomics.h?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Atomics.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Atomics.h Thu May 26 22:14:57 2011
@@ -36,6 +36,14 @@ namespace concurrent {
 
     public:
 
+        template<typename T>
+        static bool compareAndSwap(T*& target, T* expect, T* update) {
+
+            return Atomics::compareAndSet((volatile void**)&target, (void*)expect, (void*)update);
+        }
+
+    public:
+
         static bool compareAndSet32(volatile int* target, int expect, int update);
         static bool compareAndSet(volatile void** target, void* expect, void* update);
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.cpp?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.cpp Thu May 26 22:14:57 2011
@@ -17,41 +17,86 @@
 
 #include "CountDownLatch.h"
 
+#include <decaf/lang/Integer.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
 
 using namespace decaf;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::locks;
 
 ////////////////////////////////////////////////////////////////////////////////
-CountDownLatch::CountDownLatch( int count ) : count(count), mutex() {
+namespace decaf{
+namespace util{
+namespace concurrent{
+
+    class LatchSync : public AbstractQueuedSynchronizer {
+    private:
+
+        LatchSync(const LatchSync&);
+        LatchSync& operator= (const LatchSync&);
+
+    public:
+
+        LatchSync(int count) : AbstractQueuedSynchronizer() {
+            this->setState(count);
+        }
+        virtual ~LatchSync() {}
+
+        int getCount() const {
+            return getState();
+        }
+
+    protected:
+
+        virtual int tryAcquireShared(int acquires DECAF_UNUSED) {
+            return getState() == 0 ? 1 : -1;
+        }
+
+        virtual bool tryReleaseShared(int releases DECAF_UNUSED) {
+
+            for (;;) {
+
+                int current = getState();
+                if (current == 0) {
+                    return false;
+                }
+
+                int next = current - 1;
+                if (compareAndSetState(current, next)) {
+                    return next == 0;
+                }
+            }
+        }
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+CountDownLatch::CountDownLatch(int count) {
+    if (count < 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Count must be non-negative.");
+    }
+
+    this->sync = new LatchSync(count);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 CountDownLatch::~CountDownLatch() {
     try {
-
-        synchronized( &mutex ) {
-            mutex.notifyAll();
-        }
+        delete sync;
     }
     DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void CountDownLatch::await() {
-
     try {
-
-        synchronized( &mutex ) {
-            if( count == 0 ){
-                return;
-            }
-
-            mutex.wait();
-        }
+        this->sync->acquireSharedInterruptibly(1);
     }
     DECAF_CATCH_RETHROW( decaf::lang::exceptions::InterruptedException )
     DECAF_CATCH_RETHROW( decaf::lang::Exception )
@@ -60,27 +105,8 @@ void CountDownLatch::await() {
 
 ////////////////////////////////////////////////////////////////////////////////
 bool CountDownLatch::await( long long timeOut ) {
-
     try {
-
-        if( timeOut < 0 ) {
-            throw IllegalArgumentException(
-                __FILE__, __LINE__, "Timeout value cannot be less than zero." );
-        }
-
-        synchronized( &mutex ) {
-            if( count == 0 ){
-                return true;
-            }
-
-            if (timeOut > 0) {
-                mutex.wait( timeOut );
-            }
-
-            return count == 0;
-        }
-
-        return true;
+        return this->sync->tryAcquireSharedNanos(1, TimeUnit::MILLISECONDS.toNanos(timeOut));
     }
     DECAF_CATCH_RETHROW( decaf::lang::exceptions::InterruptedException )
     DECAF_CATCH_RETHROW( decaf::lang::Exception )
@@ -89,9 +115,8 @@ bool CountDownLatch::await( long long ti
 
 ////////////////////////////////////////////////////////////////////////////////
 bool CountDownLatch::await( long long timeout, const TimeUnit& unit ) {
-
     try{
-        return this->await( unit.toMillis( timeout ) );
+        return this->sync->tryAcquireSharedNanos(1, unit.toNanos(timeout));
     }
     DECAF_CATCH_RETHROW( decaf::lang::exceptions::InterruptedException )
     DECAF_CATCH_RETHROW( decaf::lang::Exception )
@@ -101,19 +126,18 @@ bool CountDownLatch::await( long long ti
 ////////////////////////////////////////////////////////////////////////////////
 void CountDownLatch::countDown() {
     try {
-
-        if( count == 0 ) {
-            return;
-        }
-
-        synchronized( &mutex ) {
-            count--;
-
-            // Signal when done.
-            if( count == 0 ){
-                mutex.notifyAll();
-            }
-        }
+        this->sync->releaseShared(1);
     }
     DECAF_CATCHALL_NOTHROW()
 }
+
+////////////////////////////////////////////////////////////////////////////////
+int CountDownLatch::getCount() const {
+    return this->sync->getCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string CountDownLatch::toString() const {
+    return std::string("CountDownLatch[count = ") +
+           Integer::toString(this->sync->getCount()) + "]";
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.h?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.h Thu May 26 22:14:57 2011
@@ -28,18 +28,12 @@ namespace decaf{
 namespace util{
 namespace concurrent{
 
+    class LatchSync;
+
     class DECAF_API CountDownLatch {
     private:
 
-        /**
-         * number to count down to
-         */
-        int count;
-
-        /**
-         * Mutex to protect the counts, and wait on.
-         */
-        Mutex mutex;
+        LatchSync* sync;
 
     public:
 
@@ -151,9 +145,15 @@ namespace concurrent{
          * Gets the current count
          * @returns int count value
          */
-        virtual int getCount() const {
-            return this->count;
-        }
+        virtual int getCount() const;
+
+        /**
+         * Returns the string representation of this latch, includes the current
+         * count value at the time of calling.
+         *
+         * @returns string describing this CountDownLatch instance.
+         */
+        virtual std::string toString() const;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.cpp?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.cpp Thu May 26 22:14:57 2011
@@ -17,116 +17,260 @@
 
 #include "Semaphore.h"
 
+#include <decaf/lang/Integer.h>
+
+#include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
+
 using namespace decaf;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::locks;
 
 ////////////////////////////////////////////////////////////////////////////////
 namespace decaf {
 namespace util {
 namespace concurrent {
 
-    class SemaphoreHandle {
+    class SemSync : public AbstractQueuedSynchronizer{
     public:
 
-        int permits;
-        bool fair;
+        SemSync(int permits) : AbstractQueuedSynchronizer() {
+            this->setState(permits);
+        }
+        virtual ~SemSync() {}
+
+        virtual bool isFair() const = 0;
+
+        int getPermits() {
+            return getState();
+        }
+
+        int nonfairTryAcquireShared(int acquires) {
+            for (;;) {
+                int available = getState();
+                int remaining = available - acquires;
+                if (remaining < 0 || compareAndSetState(available, remaining)) {
+                    return remaining;
+                }
+            }
+        }
+
+        void reducePermits(int reductions) {
+            for (;;) {
+                int current = getState();
+                int next = current - reductions;
+                if (compareAndSetState(current, next)) {
+                    return;
+                }
+            }
+        }
+
+        int drainPermits() {
+            for (;;) {
+                int current = getState();
+                if (current == 0 || compareAndSetState(current, 0)) {
+                    return current;
+                }
+            }
+        }
+
+    protected:
+
+        virtual bool tryReleaseShared(int releases) {
+            for (;;) {
+                int p = getState();
+                if (compareAndSetState(p, p + releases)) {
+                    return true;
+                }
+            }
+        }
+
+    };
 
+    class NonFairSemSync : public SemSync {
     public:
 
-        SemaphoreHandle( int permits ) : permits( permits ), fair( false ) {
+        NonFairSemSync(int permits) : SemSync(permits) {}
+        virtual ~NonFairSemSync() {}
+
+        virtual bool isFair() const {
+            return false;
         }
 
-        SemaphoreHandle( int permits, bool fair ) : permits( permits ), fair( fair ) {
+    protected:
+
+        virtual int tryAcquireShared(int acquires) {
+            return nonfairTryAcquireShared(acquires);
         }
+    };
+
+    class FairSemSync : public SemSync {
+    public:
 
-        ~SemaphoreHandle() {
+        FairSemSync(int permits) : SemSync(permits) {}
+        virtual ~FairSemSync() {}
+
+        virtual bool isFair() const {
+            return true;
         }
 
+    protected:
+
+        virtual int tryAcquireShared(int acquires) {
+            Thread* current = Thread::currentThread();
+            for(;;) {
+                Thread* first = this->getFirstQueuedThread();
+
+                if (first != NULL && first != current) {
+                    return -1;
+                }
+
+                int available = getState();
+                int remaining = available - acquires;
+                if (remaining < 0 || compareAndSetState(available, remaining)) {
+                    return remaining;
+                }
+            }
+        }
     };
 
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
-Semaphore::Semaphore( int permits ) : handle(new SemaphoreHandle( permits )) {
+Semaphore::Semaphore(int permits) : sync(new NonFairSemSync(permits)) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Semaphore::Semaphore( int permits, bool fair ) : handle(new SemaphoreHandle( permits, fair )) {
+Semaphore::Semaphore(int permits, bool fair) : sync(NULL) {
+    fair == true ? sync = new FairSemSync(permits) : sync = new NonFairSemSync(permits);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Semaphore::~Semaphore() {
+    try{
+        delete sync;
+    }
+    DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Semaphore::acquire() {
+    this->sync->acquireSharedInterruptibly(1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Semaphore::acquireUninterruptibly() {
+    this->sync->acquireShared(1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool Semaphore::tryAcquire() {
-    return false;
+    return this->sync->nonfairTryAcquireShared(1) >= 0;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool Semaphore::tryAcquire( long long timeout DECAF_UNUSED, const TimeUnit& unit DECAF_UNUSED ) {
-    return false;
+bool Semaphore::tryAcquire(long long timeout, const TimeUnit& unit) {
+    return this->sync->tryAcquireSharedNanos(1, unit.toNanos(timeout));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Semaphore::release() {
-
+    this->sync->releaseShared(1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Semaphore::acquire( int permits DECAF_UNUSED ) {
+void Semaphore::acquire(int permits) {
+
+    if (permits < 0) {
+        throw new IllegalArgumentException(__FILE__, __LINE__,
+            "Value of acquired permits must be greater than zero.");
+    }
 
+    this->sync->acquireSharedInterruptibly(permits);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Semaphore::acquireUninterruptibly( int permits DECAF_UNUSED ) {
 
+    if (permits < 0) {
+        throw new IllegalArgumentException(__FILE__, __LINE__,
+            "Value of acquired permits must be greater than zero.");
+    }
+
+    this->sync->acquireShared(permits);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool Semaphore::tryAcquire( int permits DECAF_UNUSED ) {
+bool Semaphore::tryAcquire(int permits) {
 
-    return false;
+    if (permits < 0) {
+        throw new IllegalArgumentException(__FILE__, __LINE__,
+            "Value of acquired permits must be greater than zero.");
+    }
+
+    return this->sync->nonfairTryAcquireShared(permits) >= 0;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool Semaphore::tryAcquire( int permits DECAF_UNUSED, long long timeout DECAF_UNUSED, const TimeUnit& unit DECAF_UNUSED ) {
+bool Semaphore::tryAcquire(int permits, long long timeout, const TimeUnit& unit) {
+
+    if (permits < 0) {
+        throw new IllegalArgumentException(__FILE__, __LINE__,
+            "Value of acquired permits must be greater than zero.");
+    }
 
-    return false;
+    return this->sync->tryAcquireSharedNanos(permits, unit.toNanos(timeout));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Semaphore::release( int permits DECAF_UNUSED ) {
+void Semaphore::release(int permits) {
+
+    if (permits < 0) {
+        throw new IllegalArgumentException(__FILE__, __LINE__,
+            "Value of acquired permits must be greater than zero.");
+    }
 
+    this->sync->releaseShared(permits);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int Semaphore::availablePermits() const {
-    return 0;
+    return this->sync->getPermits();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int Semaphore::drainPermits() {
-    return 0;
+    return this->sync->drainPermits();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Semaphore::reducePermits(int reduceBy) {
+    return this->sync->reducePermits(reduceBy);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool Semaphore::isFair() const {
-    return false;
+    return this->sync->isFair();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 std::string Semaphore::toString() const {
-    return "";
+    return std::string("Semaphore[Permits = ") + Integer::toString(this->sync->getPermits()) + "]";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Semaphore::getQueueLength() const {
+    return this->sync->getQueueLength();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Semaphore::hasQueuedThreads() const {
+    return this->sync->hasQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<decaf::lang::Thread*>* Semaphore::getQueuedThreads() const {
+    return this->sync->getQueuedThreads();
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.h?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Semaphore.h Thu May 26 22:14:57 2011
@@ -24,15 +24,14 @@
 #include <decaf/lang/exceptions/RuntimeException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 
+#include <decaf/util/Collection.h>
 #include <decaf/util/concurrent/TimeUnit.h>
 
-#include <memory>
-
 namespace decaf {
 namespace util {
 namespace concurrent {
 
-    class SemaphoreHandle;
+    class SemSync;
 
     /**
      * A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire()
@@ -134,7 +133,7 @@ namespace concurrent {
      * fairness considerations.
      *
      * This class also provides convenience methods to acquire and release multiple permits at a time.
-     * Beware of the increased risk of indefinite postponement when these methods are used without
+     * Beware of the increased risk of indefinitely postponing when these methods are used without
      * fairness set true.
      *
      * @since 1.0
@@ -142,7 +141,7 @@ namespace concurrent {
     class DECAF_API Semaphore {
     private:
 
-        std::auto_ptr<SemaphoreHandle> handle;
+        SemSync* sync;
 
     private:
 
@@ -462,6 +461,44 @@ namespace concurrent {
          */
         std::string toString() const;
 
+        /**
+         * Gets an estimated count of the number of threads that are currently waiting to acquire, this
+         * value changes dynamically so the result of this method can be invalid immediately after it
+         * is called.
+         *
+         * @returns an estimate of the number of waiting threads.
+         */
+        int getQueueLength() const;
+
+        /**
+         * @returns true if there are threads that are currently waiting to acquire this Semaphore.
+         */
+        bool hasQueuedThreads() const;
+
+    protected:
+
+        /**
+         * Reduces the number of available permits which can be useful for subclasses.  If the subclass
+         * is tracking a resource that is transiently available this method can be used to modify the
+         * Semaphore to reflect that resources current state.  This method does not block waiting for
+         * the number of permits to be available, unlike the acquire method.
+         *
+         * @param reduceBy
+         *      The number of permits to remove from the current available set.
+         *
+         * @throws IllegalArgumentException if the param passed in negative.
+         */
+        void reducePermits(int reduceBy);
+
+        /**
+         * Creates and returns a new Collection object that contains a best effort snapshot of the
+         * threads that are currently waiting to acquire.
+         *
+         * @returns a Collection pointer that contains waiting threads for lock acquisition.
+         *          The caller owns the returned pointer.
+         */
+        decaf::util::Collection<decaf::lang::Thread*>* getQueuedThreads() const;
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp Thu May 26 22:14:57 2011
@@ -207,10 +207,15 @@ namespace locks {
         }
         virtual ~SynchronizerState() {
 
+            // Ensure that the destructor waits for other operations to complete.
+            PlatformThread::writerLockMutex(rwLock);
+
             while (tail.get() != NULL) {
                 delete tail.getAndSet(tail.get()->prev);
             }
 
+            PlatformThread::unlockRWMutex(rwLock);
+
             PlatformThread::destroyRWMutex(rwLock);
         }
 
@@ -426,9 +431,7 @@ namespace locks {
                 // Attempt to set next on tail, this can fail if another thread can in
                 // and replaced the old tail but that's ok since that means next is up
                 // to date in that case.
-                PlatformThread::writerLockMutex(this->rwLock);
-                if(tail.get()->next == node) tail.get()->next = NULL;
-                PlatformThread::unlockRWMutex(this->rwLock);
+                Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
                 delete node;
             } else {
                 // If successor needs signal, try to set pred's next-link
@@ -439,7 +442,7 @@ namespace locks {
 
                 // Did we become the tail.
                 if (node == tail.get() && compareAndSetTail(node, node->prev)) {
-                    tail.get()->next = NULL;
+                    Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
                 } else {
                     node->prev->next = node->next;
                     node->next->prev = node->prev;
@@ -457,7 +460,6 @@ namespace locks {
                 }
 
                 delete node;
-
             }
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Thu May 26 22:14:57 2011
@@ -211,6 +211,7 @@ cc_sources = \
     decaf/util/concurrent/ExecutorsTestSupport.cpp \
     decaf/util/concurrent/LinkedBlockingQueueTest.cpp \
     decaf/util/concurrent/MutexTest.cpp \
+    decaf/util/concurrent/SemaphoreTest.cpp \
     decaf/util/concurrent/SynchronousQueueTest.cpp \
     decaf/util/concurrent/ThreadPoolExecutorTest.cpp \
     decaf/util/concurrent/TimeUnitTest.cpp \
@@ -437,6 +438,7 @@ h_sources = \
     decaf/util/concurrent/ExecutorsTestSupport.h \
     decaf/util/concurrent/LinkedBlockingQueueTest.h \
     decaf/util/concurrent/MutexTest.h \
+    decaf/util/concurrent/SemaphoreTest.h \
     decaf/util/concurrent/SynchronousQueueTest.h \
     decaf/util/concurrent/ThreadPoolExecutorTest.h \
     decaf/util/concurrent/TimeUnitTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/ArrayPointerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/ArrayPointerTest.cpp?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/ArrayPointerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/ArrayPointerTest.cpp Thu May 26 22:14:57 2011
@@ -529,30 +529,33 @@ namespace {
 ////////////////////////////////////////////////////////////////////////////////
 void ArrayPointerTest::testThreadSafety() {
 
-    ArrayPointer<ArrayPointerTestThread*> thread( 10 );
+    const int NUM_THREADS = 50;
+    const int ITERATIONS = 1000;
+
+    ArrayPointer<ArrayPointerTestThread*> thread( NUM_THREADS );
     Gate gate;
 
-    for( int i = 0; i < 10; i++ ) {
+    for( int i = 0; i < NUM_THREADS; i++ ) {
         thread[i] = new ArrayPointerTestThread( &gate );
         thread[i]->start();
     }
 
-    for( int j = 0; j < 1000; j++ ) {
+    for( int j = 0; j < ITERATIONS; j++ ) {
         // Put this in its own scope so that the main thread frees the string
         // before the threads.
         {
             ArrayPointer<std::string> s( 1 );
-            for( int i = 0; i < 10; i++ ) {
+            for( int i = 0; i < NUM_THREADS; i++ ) {
                 thread[i]->setString( s );
             }
         }
 
         // Signal the threads to free the string.
-        gate.open( 10 );
+        gate.open( NUM_THREADS );
         gate.close();
     }
 
-    for( int i = 0; i < 10; i++ ) {
+    for( int i = 0; i < NUM_THREADS; i++ ) {
         thread[i]->join();
         delete thread[i];
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.cpp?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.cpp Thu May 26 22:14:57 2011
@@ -18,6 +18,8 @@
 #include "CountDownLatchTest.h"
 
 using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
@@ -62,3 +64,262 @@ void CountDownLatchTest::test2()
 
     CPPUNIT_ASSERT( true );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testConstructor() {
+    try {
+        CountDownLatch l(-1);
+        shouldThrow();
+    } catch(IllegalArgumentException& success){}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testGetCount() {
+    CountDownLatch l(2);
+    CPPUNIT_ASSERT_EQUAL(2, l.getCount());
+    l.countDown();
+    CPPUNIT_ASSERT_EQUAL(1, l.getCount());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testCountDown() {
+    CountDownLatch l(1);
+    CPPUNIT_ASSERT_EQUAL(1, l.getCount());
+    l.countDown();
+    CPPUNIT_ASSERT_EQUAL(0, l.getCount());
+    l.countDown();
+    CPPUNIT_ASSERT_EQUAL(0, l.getCount());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAwaitRunnable : public Runnable {
+    private:
+
+        CountDownLatch* latch;
+        CountDownLatchTest* parent;
+
+    public:
+
+        TestAwaitRunnable(CountDownLatch* latch, CountDownLatchTest* parent) :
+            Runnable(), latch(latch), parent(parent) {}
+        virtual ~TestAwaitRunnable() {}
+
+        virtual void run() {
+            try {
+                parent->threadAssertTrue(latch->getCount() > 0);
+                latch->await();
+                parent->threadAssertTrue(latch->getCount() == 0);
+            } catch(InterruptedException& e) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testAwait() {
+    CountDownLatch l(2);
+    TestAwaitRunnable runnable(&l, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 2);
+        Thread::sleep(SHORT_DELAY_MS);
+        l.countDown();
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 1);
+        l.countDown();
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 0);
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTimedAwaitRunnable : public Runnable {
+    private:
+
+        CountDownLatch* latch;
+        CountDownLatchTest* parent;
+
+    public:
+
+        TestTimedAwaitRunnable(CountDownLatch* latch, CountDownLatchTest* parent) :
+            Runnable(), latch(latch), parent(parent) {}
+        virtual ~TestTimedAwaitRunnable() {}
+
+        virtual void run() {
+            try {
+                parent->threadAssertTrue(latch->getCount() > 0);
+                parent->threadAssertTrue(latch->await(CountDownLatchTest::SMALL_DELAY_MS, TimeUnit::MILLISECONDS));
+            } catch(InterruptedException& e) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testTimedAwait() {
+    CountDownLatch l(2);
+    TestTimedAwaitRunnable runnable(&l, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 2);
+        Thread::sleep(SHORT_DELAY_MS);
+        l.countDown();
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 1);
+        l.countDown();
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 0);
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAwaitInterruptedExceptionRunnable : public Runnable {
+    private:
+
+        CountDownLatch* latch;
+        CountDownLatchTest* parent;
+
+    public:
+
+        TestAwaitInterruptedExceptionRunnable(CountDownLatch* latch, CountDownLatchTest* parent) :
+            Runnable(), latch(latch), parent(parent) {}
+        virtual ~TestAwaitInterruptedExceptionRunnable() {}
+
+        virtual void run() {
+            try {
+                parent->threadAssertTrue(latch->getCount() > 0);
+                latch->await();
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testAwaitInterruptedException() {
+    CountDownLatch l(1);
+    TestAwaitInterruptedExceptionRunnable runnable(&l, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 1);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTimedAwaitInterruptedExceptionRunnable : public Runnable {
+    private:
+
+        CountDownLatch* latch;
+        CountDownLatchTest* parent;
+
+    public:
+
+        TestTimedAwaitInterruptedExceptionRunnable(CountDownLatch* latch, CountDownLatchTest* parent) :
+            Runnable(), latch(latch), parent(parent) {}
+        virtual ~TestTimedAwaitInterruptedExceptionRunnable() {}
+
+        virtual void run() {
+            try {
+                parent->threadAssertTrue(latch->getCount() > 0);
+                latch->await(CountDownLatchTest::MEDIUM_DELAY_MS, TimeUnit::MILLISECONDS);
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testTimedAwaitInterruptedException() {
+    CountDownLatch l(1);
+    TestTimedAwaitInterruptedExceptionRunnable runnable(&l, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 1);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAwaitTimeoutRunnable : public Runnable {
+    private:
+
+        CountDownLatch* latch;
+        CountDownLatchTest* parent;
+
+    public:
+
+        TestAwaitTimeoutRunnable(CountDownLatch* latch, CountDownLatchTest* parent) :
+            Runnable(), latch(latch), parent(parent) {}
+        virtual ~TestAwaitTimeoutRunnable() {}
+
+        virtual void run() {
+            try {
+                parent->threadAssertTrue(latch->getCount() > 0);
+                parent->threadAssertFalse(latch->await(CountDownLatchTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                parent->threadAssertTrue(latch->getCount() > 0);
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testAwaitTimeout() {
+    CountDownLatch l(1);
+    TestAwaitTimeoutRunnable runnable(&l, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        CPPUNIT_ASSERT_EQUAL(l.getCount(), 1);
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatchTest::testToString() {
+    CountDownLatch s(2);
+    std::string us = s.toString();
+    CPPUNIT_ASSERT((int)us.find_first_of("Count = 2") >= 0);
+    s.countDown();
+    std::string s1 = s.toString();
+    CPPUNIT_ASSERT((int)s1.find_first_of("Count = 1") >= 0);
+    s.countDown();
+    std::string s2 = s.toString();
+    CPPUNIT_ASSERT((int)s2.find_first_of("Count = 0") >= 0);
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.h?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/CountDownLatchTest.h Thu May 26 22:14:57 2011
@@ -20,6 +20,7 @@
 
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
+#include <decaf/util/concurrent/ExecutorsTestSupport.h>
 
 #include <decaf/lang/Thread.h>
 #include <decaf/util/concurrent/CountDownLatch.h>
@@ -28,11 +29,20 @@ namespace decaf{
 namespace util{
 namespace concurrent{
 
-    class CountDownLatchTest : public CppUnit::TestFixture {
+    class CountDownLatchTest : public ExecutorsTestSupport {
 
         CPPUNIT_TEST_SUITE( CountDownLatchTest );
         CPPUNIT_TEST( test );
         CPPUNIT_TEST( test2 );
+        CPPUNIT_TEST( testConstructor );
+        CPPUNIT_TEST( testGetCount );
+        CPPUNIT_TEST( testCountDown );
+        CPPUNIT_TEST( testAwait );
+        CPPUNIT_TEST( testTimedAwait );
+        CPPUNIT_TEST( testAwaitInterruptedException );
+        CPPUNIT_TEST( testTimedAwaitInterruptedException );
+        CPPUNIT_TEST( testAwaitTimeout );
+        CPPUNIT_TEST( testToString );
         CPPUNIT_TEST_SUITE_END();
 
     protected:
@@ -64,8 +74,18 @@ namespace concurrent{
         CountDownLatchTest() {}
         virtual ~CountDownLatchTest() {}
 
-        virtual void test();
-        virtual void test2();
+        void test();
+        void test2();
+        void testConstructor();
+        void testGetCount();
+        void testCountDown();
+        void testAwait();
+        void testTimedAwait();
+        void testAwaitInterruptedException();
+        void testTimedAwaitInterruptedException();
+        void testAwaitTimeout();
+        void testToString();
+
     };
 
 }}}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.cpp?rev=1128104&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.cpp Thu May 26 22:14:57 2011
@@ -0,0 +1,1119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SemaphoreTest.h"
+
+#include <decaf/lang/System.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/Date.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/Semaphore.h>
+#include <decaf/util/concurrent/locks/LockSupport.h>
+#include <decaf/util/concurrent/locks/ReentrantLock.h>
+
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::locks;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class PublicSemaphore : public Semaphore {
+    public:
+
+        PublicSemaphore(int p, bool f) : Semaphore(p, f) {}
+        virtual ~PublicSemaphore() {}
+
+        Collection<Thread*>* getQueuedThreads() const {
+            return Semaphore::getQueuedThreads();
+        }
+
+        void reducePermits(int p) {
+            Semaphore::reducePermits(p);
+        }
+    };
+
+    class InterruptibleLockRunnable : public Runnable {
+    private:
+
+        Semaphore* lock;
+        SemaphoreTest* parent;
+
+    public:
+
+        InterruptibleLockRunnable(Semaphore* l, SemaphoreTest* parent) :
+            Runnable(), lock(l), parent(parent) {}
+        virtual ~InterruptibleLockRunnable() {}
+
+        virtual void run() {
+            try {
+                lock->acquire();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+
+    class InterruptedLockRunnable : public Runnable {
+    private:
+
+        Semaphore* lock;
+        SemaphoreTest* parent;
+
+    public:
+
+        InterruptedLockRunnable(Semaphore* l, SemaphoreTest* parent) :
+            Runnable(), lock(l), parent(parent) {}
+        virtual ~InterruptedLockRunnable() {}
+
+        virtual void run() {
+            try {
+                lock->acquire();
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SemaphoreTest::SemaphoreTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SemaphoreTest::~SemaphoreTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testConstructor() {
+    Semaphore s0(0, false);
+    CPPUNIT_ASSERT_EQUAL(0, s0.availablePermits());
+    CPPUNIT_ASSERT(!s0.isFair());
+    Semaphore s1(-1, false);
+    CPPUNIT_ASSERT_EQUAL(-1, s1.availablePermits());
+    CPPUNIT_ASSERT(!s1.isFair());
+    Semaphore s2(1, false);
+    CPPUNIT_ASSERT_EQUAL(1, s2.availablePermits());
+    CPPUNIT_ASSERT(!s2.isFair());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testConstructor2() {
+
+    Semaphore s0(0);
+    CPPUNIT_ASSERT_EQUAL(0, s0.availablePermits());
+    CPPUNIT_ASSERT(!s0.isFair());
+    Semaphore s1(-1);
+    CPPUNIT_ASSERT_EQUAL(-1, s1.availablePermits());
+    CPPUNIT_ASSERT(!s1.isFair());
+    Semaphore s2(-1);
+    CPPUNIT_ASSERT_EQUAL(-1, s2.availablePermits());
+    CPPUNIT_ASSERT(!s2.isFair());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTryAcquireInSameThread() {
+
+    Semaphore s(2, false);
+    CPPUNIT_ASSERT_EQUAL(2, s.availablePermits());
+    CPPUNIT_ASSERT(s.tryAcquire());
+    CPPUNIT_ASSERT(s.tryAcquire());
+    CPPUNIT_ASSERT_EQUAL(0, s.availablePermits());
+    CPPUNIT_ASSERT(!s.tryAcquire());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireReleaseInSameThread() {
+
+    Semaphore s(1, false);
+    try {
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    } catch( InterruptedException& e){
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireUninterruptiblyReleaseInSameThread() {
+
+    Semaphore s(1, false);
+    try {
+        s.acquireUninterruptibly();
+        s.release();
+        s.acquireUninterruptibly();
+        s.release();
+        s.acquireUninterruptibly();
+        s.release();
+        s.acquireUninterruptibly();
+        s.release();
+        s.acquireUninterruptibly();
+        s.release();
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    } catch(...) {
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTimedAcquireReleaseInSameThread() {
+
+    Semaphore s(1, false);
+    try {
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAcquireReleaseInDifferentThreadsRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestAcquireReleaseInDifferentThreadsRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestAcquireReleaseInDifferentThreadsRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->acquire();
+                sem->release();
+                sem->release();
+                sem->acquire();
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireReleaseInDifferentThreads() {
+
+    Semaphore s(0, false);
+    TestAcquireReleaseInDifferentThreadsRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        s.release();
+        s.release();
+        s.acquire();
+        s.acquire();
+        s.release();
+        t.join();
+    } catch( InterruptedException& e){
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestUninterruptibleAcquireReleaseInDifferentThreadsRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestUninterruptibleAcquireReleaseInDifferentThreadsRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestUninterruptibleAcquireReleaseInDifferentThreadsRunnable() {}
+
+        virtual void run() {
+            sem->acquireUninterruptibly();
+            sem->release();
+            sem->release();
+            sem->acquireUninterruptibly();
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testUninterruptibleAcquireReleaseInDifferentThreads() {
+
+    Semaphore s(0, false);
+    TestUninterruptibleAcquireReleaseInDifferentThreadsRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        s.release();
+        s.release();
+        s.acquireUninterruptibly();
+        s.acquireUninterruptibly();
+        s.release();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTimedAcquireReleaseInDifferentThreadsRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestTimedAcquireReleaseInDifferentThreadsRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestTimedAcquireReleaseInDifferentThreadsRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->release();
+                parent->threadAssertTrue(sem->tryAcquire(SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                sem->release();
+                parent->threadAssertTrue(sem->tryAcquire(SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTimedAcquireReleaseInDifferentThreads() {
+
+    Semaphore s(1, false);
+    TestTimedAcquireReleaseInDifferentThreadsRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        s.release();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAcquireInterruptedExceptionRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestAcquireInterruptedExceptionRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestAcquireInterruptedExceptionRunnable() {}
+
+        virtual void run() {
+            try{
+                sem->acquire();
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireInterruptedException() {
+
+    Semaphore s(0, false);
+    TestAcquireInterruptedExceptionRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        Thread::sleep(SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTryAcquireInterruptedExceptionRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestTryAcquireInterruptedExceptionRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestTryAcquireInterruptedExceptionRunnable() {}
+
+        virtual void run() {
+            try{
+                sem->tryAcquire(SemaphoreTest::MEDIUM_DELAY_MS, TimeUnit::MILLISECONDS);
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTryAcquireInterruptedException() {
+
+    Semaphore s(0, false);
+    TestAcquireInterruptedExceptionRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        Thread::sleep(SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testHasQueuedThreads() {
+
+    Semaphore lock(1, false);
+    InterruptedLockRunnable runnable1(&lock, this);
+    InterruptibleLockRunnable runnable2(&lock, this);
+    Thread t1(&runnable1);
+    Thread t2(&runnable2);
+
+    try {
+        CPPUNIT_ASSERT(!lock.hasQueuedThreads());
+        lock.acquireUninterruptibly();
+        t1.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(lock.hasQueuedThreads());
+        t2.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(lock.hasQueuedThreads());
+        t1.interrupt();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(lock.hasQueuedThreads());
+        lock.release();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(!lock.hasQueuedThreads());
+        t1.join();
+        t2.join();
+    } catch(Exception& e){
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testGetQueueLength() {
+
+    Semaphore lock(1, false);
+
+    InterruptedLockRunnable runnable1(&lock, this);
+    InterruptibleLockRunnable runnable2(&lock, this);
+    Thread t1(&runnable1);
+    Thread t2(&runnable2);
+
+    try {
+        CPPUNIT_ASSERT_EQUAL(0, lock.getQueueLength());
+        lock.acquireUninterruptibly();
+        t1.start();
+        Thread::sleep( SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(1, lock.getQueueLength());
+        t2.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(2, lock.getQueueLength());
+        t1.interrupt();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(1, lock.getQueueLength());
+        lock.release();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(0, lock.getQueueLength());
+        t1.join();
+        t2.join();
+    } catch(Exception& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testGetQueuedThreads() {
+
+    PublicSemaphore lock(1, false);
+    InterruptedLockRunnable runnable1(&lock, this);
+    InterruptibleLockRunnable runnable2(&lock, this);
+    Thread t1(&runnable1);
+    Thread t2(&runnable2);
+
+    try {
+        CPPUNIT_ASSERT(std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->isEmpty());
+        lock.acquireUninterruptibly();
+        CPPUNIT_ASSERT(std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->isEmpty());
+        t1.start();
+        Thread::sleep( SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->contains(&t1));
+        t2.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->contains(&t1));
+        CPPUNIT_ASSERT(std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->contains(&t2));
+        t1.interrupt();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(!std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->contains(&t1));
+        CPPUNIT_ASSERT(std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->contains(&t2));
+        lock.release();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(std::auto_ptr<Collection<Thread*> >(lock.getQueuedThreads())->isEmpty());
+        t1.join();
+        t2.join();
+    } catch(Exception& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testDrainPermits() {
+    Semaphore s(0, false);
+    CPPUNIT_ASSERT_EQUAL(0, s.availablePermits());
+    CPPUNIT_ASSERT_EQUAL(0, s.drainPermits());
+    s.release(10);
+    CPPUNIT_ASSERT_EQUAL(10, s.availablePermits());
+    CPPUNIT_ASSERT_EQUAL(10, s.drainPermits());
+    CPPUNIT_ASSERT_EQUAL(0, s.availablePermits());
+    CPPUNIT_ASSERT_EQUAL(0, s.drainPermits());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testReducePermits() {
+    PublicSemaphore s(10, false);
+    CPPUNIT_ASSERT_EQUAL(10, s.availablePermits());
+    s.reducePermits(1);
+    CPPUNIT_ASSERT_EQUAL(9, s.availablePermits());
+    s.reducePermits(10);
+    CPPUNIT_ASSERT_EQUAL(-1, s.availablePermits());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testConstructorFair() {
+    Semaphore s0(0, true);
+    CPPUNIT_ASSERT_EQUAL(0, s0.availablePermits());
+    CPPUNIT_ASSERT(s0.isFair());
+    Semaphore s1(-1, true);
+    CPPUNIT_ASSERT_EQUAL(-1, s1.availablePermits());
+    Semaphore s2(-1, true);
+    CPPUNIT_ASSERT_EQUAL(-1, s2.availablePermits());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTryAcquireInSameThreadFair() {
+    Semaphore s(2, true);
+    CPPUNIT_ASSERT_EQUAL(2, s.availablePermits());
+    CPPUNIT_ASSERT(s.tryAcquire());
+    CPPUNIT_ASSERT(s.tryAcquire());
+    CPPUNIT_ASSERT_EQUAL(0, s.availablePermits());
+    CPPUNIT_ASSERT(!s.tryAcquire());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTryAcquireNInSameThreadFair() {
+    Semaphore s(2, true);
+    CPPUNIT_ASSERT_EQUAL(2, s.availablePermits());
+    CPPUNIT_ASSERT(s.tryAcquire(2));
+    CPPUNIT_ASSERT_EQUAL(0, s.availablePermits());
+    CPPUNIT_ASSERT(!s.tryAcquire());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireReleaseInSameThreadFair() {
+    Semaphore s(1, true);
+    try {
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        s.acquire();
+        s.release();
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireReleaseNInSameThreadFair() {
+    Semaphore s(1, true);
+    try {
+        s.release(1);
+        s.acquire(1);
+        s.release(2);
+        s.acquire(2);
+        s.release(3);
+        s.acquire(3);
+        s.release(4);
+        s.acquire(4);
+        s.release(5);
+        s.acquire(5);
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireUninterruptiblyReleaseNInSameThreadFair() {
+    Semaphore s(1, true);
+    try {
+        s.release(1);
+        s.acquireUninterruptibly(1);
+        s.release(2);
+        s.acquireUninterruptibly(2);
+        s.release(3);
+        s.acquireUninterruptibly(3);
+        s.release(4);
+        s.acquireUninterruptibly(4);
+        s.release(5);
+        s.acquireUninterruptibly(5);
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    }catch(...){
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTimedAcquireReleaseNInSameThreadFair() {
+    Semaphore s(1, true);
+    try {
+        s.release(1);
+        CPPUNIT_ASSERT(s.tryAcquire(1, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release(2);
+        CPPUNIT_ASSERT(s.tryAcquire(2, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release(3);
+        CPPUNIT_ASSERT(s.tryAcquire(3, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release(4);
+        CPPUNIT_ASSERT(s.tryAcquire(4, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release(5);
+        CPPUNIT_ASSERT(s.tryAcquire(5, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTimedAcquireReleaseInSameThreadFair() {
+    Semaphore s(1, true);
+    try {
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT(s.tryAcquire(SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release();
+        CPPUNIT_ASSERT_EQUAL(1, s.availablePermits());
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAcquireReleaseInDifferentThreadsFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestAcquireReleaseInDifferentThreadsFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestAcquireReleaseInDifferentThreadsFairRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->acquire();
+                sem->acquire();
+                sem->acquire();
+                sem->acquire();
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireReleaseInDifferentThreadsFair() {
+    Semaphore s(0, true);
+    TestAcquireReleaseInDifferentThreadsFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep( SHORT_DELAY_MS);
+        s.release();
+        s.release();
+        s.release();
+        s.release();
+        s.release();
+        s.release();
+        t.join();
+        CPPUNIT_ASSERT_EQUAL(2, s.availablePermits());
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAcquireReleaseNInDifferentThreadsFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestAcquireReleaseNInDifferentThreadsFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestAcquireReleaseNInDifferentThreadsFairRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->acquire();
+                sem->release(2);
+                sem->acquire();
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireReleaseNInDifferentThreadsFair() {
+    Semaphore s(0, true);
+    TestAcquireReleaseNInDifferentThreadsFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep( SHORT_DELAY_MS);
+        s.release(2);
+        s.acquire(2);
+        s.release(1);
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAcquireReleaseNInDifferentThreadsFair2Runnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestAcquireReleaseNInDifferentThreadsFair2Runnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestAcquireReleaseNInDifferentThreadsFair2Runnable() {}
+
+        virtual void run() {
+            try {
+                sem->acquire(2);
+                sem->acquire(2);
+                sem->release(4);
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireReleaseNInDifferentThreadsFair2() {
+    Semaphore s(0, true);
+    TestAcquireReleaseNInDifferentThreadsFair2Runnable runnable(&s, this);
+    Thread t(&runnable);
+
+    try {
+        t.start();
+        Thread::sleep( SHORT_DELAY_MS);
+        s.release(6);
+        s.acquire(2);
+        s.acquire(2);
+        s.release(2);
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTimedAcquireReleaseInDifferentThreadsFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestTimedAcquireReleaseInDifferentThreadsFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestTimedAcquireReleaseInDifferentThreadsFairRunnable() {}
+
+        virtual void run() {
+            try {
+                parent->threadAssertTrue(sem->tryAcquire(SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                parent->threadAssertTrue(sem->tryAcquire(SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                parent->threadAssertTrue(sem->tryAcquire(SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                parent->threadAssertTrue(sem->tryAcquire(SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                parent->threadAssertTrue(sem->tryAcquire(SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTimedAcquireReleaseInDifferentThreadsFair() {
+    Semaphore s(1, true);
+    TestTimedAcquireReleaseInDifferentThreadsFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        s.release();
+        s.release();
+        s.release();
+        s.release();
+        s.release();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTimedAcquireReleaseNInDifferentThreadsFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestTimedAcquireReleaseNInDifferentThreadsFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestTimedAcquireReleaseNInDifferentThreadsFairRunnable() {}
+
+        virtual void run() {
+            try {
+                parent->threadAssertTrue(sem->tryAcquire(2, SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                sem->release(2);
+                parent->threadAssertTrue(sem->tryAcquire(2, SemaphoreTest::SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+                sem->release(2);
+            } catch(InterruptedException& ie) {
+                parent->threadUnexpectedException();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTimedAcquireReleaseNInDifferentThreadsFair() {
+    Semaphore s(2, true);
+    TestTimedAcquireReleaseNInDifferentThreadsFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        CPPUNIT_ASSERT(s.tryAcquire(2, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release(2);
+        CPPUNIT_ASSERT(s.tryAcquire(2, SHORT_DELAY_MS, TimeUnit::MILLISECONDS));
+        s.release(2);
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAcquireInterruptedExceptionFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestAcquireInterruptedExceptionFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestAcquireInterruptedExceptionFairRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->acquire();
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {}
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireInterruptedExceptionFair() {
+    Semaphore s(0, true);
+    TestAcquireInterruptedExceptionFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        Thread::sleep( SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestAcquireNInterruptedExceptionFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestAcquireNInterruptedExceptionFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestAcquireNInterruptedExceptionFairRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->acquire(3);
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {}
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testAcquireNInterruptedExceptionFair() {
+    Semaphore s(2, true);
+    TestAcquireNInterruptedExceptionFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        Thread::sleep( SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTryAcquireInterruptedExceptionFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestTryAcquireInterruptedExceptionFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestTryAcquireInterruptedExceptionFairRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->tryAcquire(SemaphoreTest::MEDIUM_DELAY_MS, TimeUnit::MILLISECONDS);
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTryAcquireInterruptedExceptionFair() {
+    Semaphore s(0, true);
+    TestTryAcquireInterruptedExceptionFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        Thread::sleep( SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestTryAcquireNInterruptedExceptionFairRunnable : public Runnable {
+    private:
+
+        Semaphore* sem;
+        SemaphoreTest* parent;
+
+    public:
+
+        TestTryAcquireNInterruptedExceptionFairRunnable(Semaphore* sem, SemaphoreTest* parent)
+            : Runnable(), sem(sem), parent(parent) {}
+        virtual ~TestTryAcquireNInterruptedExceptionFairRunnable() {}
+
+        virtual void run() {
+            try {
+                sem->tryAcquire(4, SemaphoreTest::MEDIUM_DELAY_MS, TimeUnit::MILLISECONDS);
+                parent->threadShouldThrow();
+            } catch(InterruptedException& success) {
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testTryAcquireNInterruptedExceptionFair() {
+    Semaphore s(1, true);
+    TestTryAcquireNInterruptedExceptionFairRunnable runnable(&s, this);
+    Thread t(&runnable);
+
+    t.start();
+    try {
+        Thread::sleep( SHORT_DELAY_MS);
+        t.interrupt();
+        t.join();
+    } catch(InterruptedException& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testGetQueueLengthFair() {
+    Semaphore lock(1, true);
+    InterruptedLockRunnable runnable1(&lock, this);
+    InterruptibleLockRunnable runnable2(&lock, this);
+    Thread t1(&runnable1);
+    Thread t2(&runnable2);
+
+    try {
+        CPPUNIT_ASSERT_EQUAL(0, lock.getQueueLength());
+        lock.acquireUninterruptibly();
+        t1.start();
+        Thread::sleep( SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(1, lock.getQueueLength());
+        t2.start();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(2, lock.getQueueLength());
+        t1.interrupt();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(1, lock.getQueueLength());
+        lock.release();
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT_EQUAL(0, lock.getQueueLength());
+        t1.join();
+        t2.join();
+    } catch(Exception& e) {
+        unexpectedException();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SemaphoreTest::testToString() {
+
+    Semaphore s(0);
+    std::string us = s.toString();
+    CPPUNIT_ASSERT((int)us.find_first_of("Permits = 0") >= 0);
+    s.release();
+    std::string s1 = s.toString();
+    CPPUNIT_ASSERT((int)s1.find_first_of("Permits = 1") >= 0);
+    s.release();
+    std::string s2 = s.toString();
+    CPPUNIT_ASSERT((int)s2.find_first_of("Permits = 2") >= 0);
+}
+

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.h?rev=1128104&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.h Thu May 26 22:14:57 2011
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_SEMAPHORETEST_H_
+#define _DECAF_UTIL_CONCURRENT_SEMAPHORETEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <decaf/util/concurrent/ExecutorsTestSupport.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+    class SemaphoreTest : public ExecutorsTestSupport {
+
+        CPPUNIT_TEST_SUITE( SemaphoreTest );
+        CPPUNIT_TEST( testConstructor );
+        CPPUNIT_TEST( testConstructor2 );
+        CPPUNIT_TEST( testTryAcquireInSameThread );
+        CPPUNIT_TEST( testAcquireReleaseInSameThread );
+        CPPUNIT_TEST( testAcquireUninterruptiblyReleaseInSameThread );
+        CPPUNIT_TEST( testTimedAcquireReleaseInSameThread );
+        CPPUNIT_TEST( testAcquireReleaseInDifferentThreads );
+        CPPUNIT_TEST( testUninterruptibleAcquireReleaseInDifferentThreads );
+        CPPUNIT_TEST( testTimedAcquireReleaseInDifferentThreads );
+        CPPUNIT_TEST( testAcquireInterruptedException );
+        CPPUNIT_TEST( testTryAcquireInterruptedException);
+        CPPUNIT_TEST( testHasQueuedThreads );
+        CPPUNIT_TEST( testGetQueueLength );
+        CPPUNIT_TEST( testGetQueuedThreads );
+        CPPUNIT_TEST( testDrainPermits );
+        CPPUNIT_TEST( testReducePermits );
+        CPPUNIT_TEST( testConstructorFair );
+        CPPUNIT_TEST( testTryAcquireInSameThreadFair );
+        CPPUNIT_TEST( testTryAcquireNInSameThreadFair );
+        CPPUNIT_TEST( testAcquireReleaseInSameThreadFair );
+        CPPUNIT_TEST( testAcquireReleaseNInSameThreadFair );
+        CPPUNIT_TEST( testAcquireUninterruptiblyReleaseNInSameThreadFair );
+        CPPUNIT_TEST( testTimedAcquireReleaseNInSameThreadFair );
+        CPPUNIT_TEST( testTimedAcquireReleaseInSameThreadFair );
+        CPPUNIT_TEST( testAcquireReleaseInDifferentThreadsFair );
+        CPPUNIT_TEST( testAcquireReleaseNInDifferentThreadsFair );
+        CPPUNIT_TEST( testAcquireReleaseNInDifferentThreadsFair2 );
+        CPPUNIT_TEST( testTimedAcquireReleaseInDifferentThreadsFair );
+        CPPUNIT_TEST( testTimedAcquireReleaseNInDifferentThreadsFair );
+        CPPUNIT_TEST( testAcquireInterruptedExceptionFair );
+        CPPUNIT_TEST( testAcquireNInterruptedExceptionFair );
+        CPPUNIT_TEST( testTryAcquireInterruptedExceptionFair );
+        CPPUNIT_TEST( testTryAcquireNInterruptedExceptionFair );
+        CPPUNIT_TEST( testGetQueueLengthFair );
+        CPPUNIT_TEST( testToString );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        SemaphoreTest();
+        virtual ~SemaphoreTest();
+
+        void testConstructor();
+        void testConstructor2();
+        void testTryAcquireInSameThread();
+        void testAcquireReleaseInSameThread();
+
+        void testAcquireUninterruptiblyReleaseInSameThread();
+        void testTimedAcquireReleaseInSameThread();
+        void testAcquireReleaseInDifferentThreads();
+        void testUninterruptibleAcquireReleaseInDifferentThreads();
+        void testTimedAcquireReleaseInDifferentThreads();
+        void testAcquireInterruptedException();
+        void testTryAcquireInterruptedException();
+        void testHasQueuedThreads();
+        void testGetQueueLength();
+        void testGetQueuedThreads();
+        void testDrainPermits();
+        void testReducePermits();
+        void testConstructorFair();
+        void testTryAcquireInSameThreadFair();
+        void testTryAcquireNInSameThreadFair();
+        void testAcquireReleaseInSameThreadFair();
+        void testAcquireReleaseNInSameThreadFair();
+        void testAcquireUninterruptiblyReleaseNInSameThreadFair();
+        void testTimedAcquireReleaseNInSameThreadFair();
+        void testTimedAcquireReleaseInSameThreadFair();
+        void testAcquireReleaseInDifferentThreadsFair();
+        void testAcquireReleaseNInDifferentThreadsFair();
+        void testAcquireReleaseNInDifferentThreadsFair2();
+        void testTimedAcquireReleaseInDifferentThreadsFair();
+        void testTimedAcquireReleaseNInDifferentThreadsFair();
+        void testAcquireInterruptedExceptionFair();
+        void testAcquireNInterruptedExceptionFair();
+        void testTryAcquireInterruptedExceptionFair();
+        void testTryAcquireNInterruptedExceptionFair();
+        void testGetQueueLengthFair();
+        void testToString();
+
+    };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_SEMAPHORETEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/SemaphoreTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1128104&r1=1128103&r2=1128104&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Thu May 26 22:14:57 2011
@@ -281,6 +281,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( decaf::
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::TimeUnitTest );
 #include <decaf/util/concurrent/LinkedBlockingQueueTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::LinkedBlockingQueueTest );
+#include <decaf/util/concurrent/SemaphoreTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::SemaphoreTest );
 
 #include <decaf/util/concurrent/atomic/AtomicBooleanTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::atomic::AtomicBooleanTest );



Mime
View raw message