activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1100383 [1/3] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: examples/ examples/producers/ main/ main/activemq/library/ main/activemq/transport/ main/activemq/wireformat/ main/decaf/internal/ main/decaf/internal/net/ssl/openssl/ main/...
Date Fri, 06 May 2011 21:22:41 GMT
Author: tabish
Date: Fri May  6 21:22:39 2011
New Revision: 1100383

URL: http://svn.apache.org/viewvc?rev=1100383&view=rev
Log:
Refactor the threading model to allow for interruption of thread methods that block along with caching of some threading primitives that are normally created and destroy very frequently.  Makes the Mutex object use lazy init of its synchronization resources so that the classes that create mutex instances but don't always uses them don't need to allocate those resources.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp   (with props)
Removed:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ConditionImpl.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/MutexImpl.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/ConditionHandle.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/ConditionImpl.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/MutexHandle.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/MutexImpl.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/ConditionHandle.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/ConditionImpl.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/MutexHandle.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/MutexImpl.cpp
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/main.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/producers/SimpleProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/library/ActiveMQCPP.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/ssl/openssl/OpenSSLContextSpi.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/System.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/CountDownLatch.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Mutex.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/logging/LogManager.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/logging/LogManager.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/logging/LogWriter.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/logging/LogWriter.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/logging/SimpleLogger.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-benchmarks/main.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/SlowListenerTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/SystemTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/ThreadTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/ThreadTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/TimerTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/TimerTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/main.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/main.cpp?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/main.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/main.cpp Fri May  6 21:22:39 2011
@@ -116,7 +116,7 @@ public:
             producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
 
             // Create the Thread Id String
-            string threadIdStr = Long::toString( Thread::getId() );
+            string threadIdStr = Long::toString( Thread::currentThread()->getId() );
 
             // Create a messages
             string text = (string)"Hello world! from thread " + threadIdStr;
@@ -357,7 +357,7 @@ private:
 int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
 
     activemq::library::ActiveMQCPP::initializeLibrary();
-
+    {
     std::cout << "=====================================================\n";
     std::cout << "Starting the example:" << std::endl;
     std::cout << "-----------------------------------------------------\n";
@@ -446,6 +446,7 @@ int main(int argc AMQCPP_UNUSED, char* a
     std::cout << "Finished with the example." << std::endl;
     std::cout << "=====================================================\n";
 
+    }
     activemq::library::ActiveMQCPP::shutdownLibrary();
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/producers/SimpleProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/producers/SimpleProducer.cpp?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/producers/SimpleProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/producers/SimpleProducer.cpp Fri May  6 21:22:39 2011
@@ -121,7 +121,7 @@ public:
             producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
 
             // Create the Thread Id String
-            string threadIdStr = Long::toString( Thread::getId() );
+            string threadIdStr = Long::toString( Thread::currentThread()->getId() );
 
             // Create a messages
             string text = (string)"Hello world! from thread " + threadIdStr;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Fri May  6 21:22:39 2011
@@ -320,8 +320,8 @@ cc_sources = \
     decaf/internal/util/ResourceLifecycleManager.cpp \
     decaf/internal/util/TimerTaskHeap.cpp \
     decaf/internal/util/concurrent/SynchronizableImpl.cpp \
-    decaf/internal/util/concurrent/unix/ConditionImpl.cpp \
-    decaf/internal/util/concurrent/unix/MutexImpl.cpp \
+    decaf/internal/util/concurrent/Threading.cpp \
+    decaf/internal/util/concurrent/unix/PlatformThread.cpp \
     decaf/internal/util/zip/adler32.c \
     decaf/internal/util/zip/crc32.c \
     decaf/internal/util/zip/deflate.c \
@@ -835,16 +835,15 @@ h_sources = \
     decaf/internal/util/Resource.h \
     decaf/internal/util/ResourceLifecycleManager.h \
     decaf/internal/util/TimerTaskHeap.h \
-    decaf/internal/util/concurrent/ConditionImpl.h \
-    decaf/internal/util/concurrent/MutexImpl.h \
+    decaf/internal/util/concurrent/PlatformThread.h \
     decaf/internal/util/concurrent/SynchronizableImpl.h \
+    decaf/internal/util/concurrent/Threading.h \
+    decaf/internal/util/concurrent/ThreadingTypes.h \
     decaf/internal/util/concurrent/TransferQueue.h \
     decaf/internal/util/concurrent/TransferStack.h \
     decaf/internal/util/concurrent/Transferer.h \
-    decaf/internal/util/concurrent/unix/ConditionHandle.h \
-    decaf/internal/util/concurrent/unix/MutexHandle.h \
-    decaf/internal/util/concurrent/windows/ConditionHandle.h \
-    decaf/internal/util/concurrent/windows/MutexHandle.h \
+    decaf/internal/util/concurrent/unix/PlatformDefs.h \
+    decaf/internal/util/concurrent/windows/PlatformDefs.h \
     decaf/internal/util/zip/crc32.h \
     decaf/internal/util/zip/deflate.h \
     decaf/internal/util/zip/gzguts.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/library/ActiveMQCPP.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/library/ActiveMQCPP.cpp?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/library/ActiveMQCPP.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/library/ActiveMQCPP.cpp Fri May  6 21:22:39 2011
@@ -67,8 +67,8 @@ void ActiveMQCPP::shutdownLibrary() {
     // Shutdown the IdGenerator Kernel
     IdGenerator::shutdown();
 
-    WireFormatRegistry::getInstance().unregisterAllFactories();
-    TransportRegistry::getInstance().unregisterAllFactories();
+    WireFormatRegistry::shutdown();
+    TransportRegistry::shutdown();
 
     // Now it should be safe to shutdown Decaf.
     decaf::lang::Runtime::shutdownRuntime();
@@ -79,6 +79,7 @@ void ActiveMQCPP::registerWireFormats() 
 
     // Each of the internally implemented WireFormat's is registered here
     // with the WireFormat Registry
+    WireFormatRegistry::initialize();
 
     WireFormatRegistry::getInstance().registerFactory(
         "openwire", new wireformat::openwire::OpenWireFormatFactory() );
@@ -89,8 +90,9 @@ void ActiveMQCPP::registerWireFormats() 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQCPP::registerTransports() {
 
-    // Each of the internally implemented WireFormat's is registered here
-    // with the WireFormat Registry
+    // Each of the internally implemented Transports is registered here
+    // with the Transport Registry
+    TransportRegistry::initialize();
 
     TransportRegistry::getInstance().registerFactory(
         "tcp", new TcpTransportFactory() );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp Fri May  6 21:22:39 2011
@@ -26,6 +26,11 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace {
+    TransportRegistry* theOnlyInstance;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 TransportRegistry::TransportRegistry() : registry() {
 }
 
@@ -92,6 +97,17 @@ std::vector<std::string> TransportRegist
 
 ////////////////////////////////////////////////////////////////////////////////
 TransportRegistry& TransportRegistry::getInstance() {
-    static TransportRegistry registry;
-    return registry;
+    return *theOnlyInstance;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TransportRegistry::initialize() {
+    theOnlyInstance = new TransportRegistry();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TransportRegistry::shutdown() {
+    theOnlyInstance->unregisterAllFactories();
+    delete theOnlyInstance;
+    theOnlyInstance = NULL;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h Fri May  6 21:22:39 2011
@@ -30,6 +30,9 @@
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 
 namespace activemq {
+namespace library {
+    class ActiveMQCPP;
+}
 namespace transport {
 
     /**
@@ -119,6 +122,13 @@ namespace transport {
          */
         static TransportRegistry& getInstance();
 
+    private:
+
+        static void initialize();
+        static void shutdown();
+
+        friend class activemq::library::ActiveMQCPP;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.cpp?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.cpp Fri May  6 21:22:39 2011
@@ -26,6 +26,11 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace {
+    WireFormatRegistry* theOnlyInstance;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 WireFormatRegistry::WireFormatRegistry() : registry() {
 }
 
@@ -92,6 +97,17 @@ std::vector<std::string> WireFormatRegis
 
 ////////////////////////////////////////////////////////////////////////////////
 WireFormatRegistry& WireFormatRegistry::getInstance() {
-    static WireFormatRegistry registry;
-    return registry;
+    return *theOnlyInstance;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void WireFormatRegistry::initialize() {
+    theOnlyInstance = new WireFormatRegistry();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void WireFormatRegistry::shutdown() {
+    theOnlyInstance->unregisterAllFactories();
+    delete theOnlyInstance;
+    theOnlyInstance = NULL;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.h?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/WireFormatRegistry.h Fri May  6 21:22:39 2011
@@ -30,6 +30,9 @@
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 
 namespace activemq {
+namespace library {
+    class ActiveMQCPP;
+}
 namespace wireformat {
 
     /**
@@ -117,6 +120,13 @@ namespace wireformat {
          */
         static WireFormatRegistry& getInstance();
 
+    private:
+
+        static void initialize();
+        static void shutdown();
+
+        friend class activemq::library::ActiveMQCPP;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.cpp?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.cpp Fri May  6 21:22:39 2011
@@ -24,93 +24,59 @@
 #include <decaf/lang/System.h>
 #include <decaf/lang/Thread.h>
 #include <decaf/internal/net/Network.h>
+#include <decaf/internal/util/concurrent/Threading.h>
 
 using namespace decaf;
 using namespace decaf::internal;
 using namespace decaf::internal::net;
+using namespace decaf::internal::util::concurrent;
 using namespace decaf::lang;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace decaf{
-namespace internal{
-
-    class RuntimeData {
-    public:
-
-        mutable apr_pool_t* aprPool;
-        Mutex* lock;
-
-    public:
-
-        RuntimeData() : aprPool(NULL), lock(NULL) {
-        }
-
-    };
-
-}}
+namespace {
+    apr_pool_t* aprPool;
+    Mutex* globalLock;
+}
 
 ////////////////////////////////////////////////////////////////////////////////
-DecafRuntime::DecafRuntime() : runtimeData(new RuntimeData()) {
-
-    // Initializes the APR Runtime from within a library.
-    apr_initialize();
-
-    // Create a Global Pool for Threads to use
-    apr_pool_create_ex( &runtimeData->aprPool, NULL, NULL, NULL );
-
-    // Create the global Lock object now that the memory pool exists.
-    this->runtimeData->lock = new Mutex;
+DecafRuntime::DecafRuntime() : decaf::lang::Runtime() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 DecafRuntime::~DecafRuntime() {
-
-    try{
-
-        // Destory the Global Lock before we deallocate the memory pool.
-        delete this->runtimeData->lock;
-
-        // Destroy the Global Thread Memory Pool
-        apr_pool_destroy( this->runtimeData->aprPool );
-
-        // Cleans up APR data structures.
-        apr_terminate();
-
-        // Destroy the Runtime Data
-        delete this->runtimeData;
-    }
-    DECAF_CATCH_NOTHROW( Exception )
-    DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 apr_pool_t* DecafRuntime::getGlobalPool() const {
-    return this->runtimeData->aprPool;
+    return aprPool;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Mutex* DecafRuntime::getGlobalLock() {
-    return this->runtimeData->lock;
+    return globalLock;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Runtime* Runtime::getRuntime() {
-
     static DecafRuntime runtime;
-
     return &runtime;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Runtime::initializeRuntime( int argc, char **argv ) {
 
-    // Do this for now, once we remove APR we can do this in a way that
-    // makes more sense.
+    // Initializes the APR Runtime from within a library.
+    apr_initialize();
+    apr_pool_create_ex( &aprPool, NULL, NULL, NULL );
+
     Runtime::getRuntime();
 
     // Initialize any Platform specific Threading primitives
-    Thread::initThreading();
+    Threading::initialize();
+
+    // Create the global Lock object now that Threading is started.
+    globalLock = new Mutex;
 
     // Initialize the System Class to make things like Properties available.
     System::initSystem( argc, argv );
@@ -135,7 +101,14 @@ void Runtime::shutdownRuntime() {
     // cleaned up.
     System::shutdownSystem();
 
+    // This must go away before Threading is shutdown.
+    delete globalLock;
+
     // Threading is the last to by shutdown since most other parts of the Runtime
     // need to make use of Thread primitives.
-    Thread::shutdownThreading();
+    Threading::shutdown();
+
+    // Cleans up APR data structures.
+    apr_pool_destroy( aprPool );
+    apr_terminate();
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.h?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/DecafRuntime.h Fri May  6 21:22:39 2011
@@ -27,18 +27,12 @@
 namespace decaf {
 namespace internal {
 
-    class RuntimeData;
-
     /**
      * Handles APR initialization and termination.
      */
     class DECAF_API DecafRuntime : public decaf::lang::Runtime {
     private:
 
-       RuntimeData* runtimeData;
-
-    private:
-
        DecafRuntime( const DecafRuntime& );
        DecafRuntime& operator= ( const DecafRuntime& );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/ssl/openssl/OpenSSLContextSpi.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/ssl/openssl/OpenSSLContextSpi.cpp?rev=1100383&r1=1100382&r2=1100383&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/ssl/openssl/OpenSSLContextSpi.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/ssl/openssl/OpenSSLContextSpi.cpp Fri May  6 21:22:39 2011
@@ -139,7 +139,7 @@ namespace openssl {
         }
 
         static unsigned long getThreadId() {
-            return (unsigned long)Thread::getId();
+            return (unsigned long)Thread::currentThread()->getId();
         }
 
     };

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h?rev=1100383&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h Fri May  6 21:22:39 2011
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_INTERNAL_UTIL_CONCURRENT_PLATFORMTHREAD_H_
+#define _DECAF_INTERNAL_UTIL_CONCURRENT_PLATFORMTHREAD_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/internal/util/concurrent/ThreadingTypes.h>
+
+#include <vector>
+
+namespace decaf {
+namespace internal {
+namespace util {
+namespace concurrent {
+
+    struct ThreadHandle;
+
+    class DECAF_API PlatformThread {
+    private:
+
+        PlatformThread();
+        PlatformThread(const PlatformThread&);
+        PlatformThread& operator= (const PlatformThread&);
+
+    public:
+
+        static void createMutex(decaf_mutex_t* mutex);
+
+        static void lockMutex(decaf_mutex_t mutex);
+
+        static bool tryLockMutex(decaf_mutex_t mutex);
+
+        static void unlockMutex(decaf_mutex_t mutex);
+
+        static void destroyMutex(decaf_mutex_t mutex);
+
+    public:  // Condition processing methods
+
+        static void createCondition(decaf_condition_t* condition);
+
+        static void notify(decaf_condition_t condition);
+
+        static void notifyAll(decaf_condition_t condition);
+
+        static void waitOnCondition(decaf_condition_t condition, decaf_mutex_t mutex);
+
+        /**
+         * @return true if the condition wait met the timeout parameters.
+         */
+        static bool waitOnCondition(decaf_condition_t condition, decaf_mutex_t mutex,
+                                    long long mills, int nanos);
+
+        static void interruptibleWaitOnCondition(decaf_condition_t condition,
+                                                 decaf_mutex_t mutex,
+                                                 CompletionCondition& complete);
+
+        /**
+         * @return true if the condition wait met the timeout parameters without being signaled.
+         */
+        static bool interruptibleWaitOnCondition(decaf_condition_t condition, decaf_mutex_t mutex,
+                                                 long long mills, int nanos, CompletionCondition& complete);
+
+        static void destroyCondition(decaf_condition_t condition);
+
+    public:  // Thread Methods.
+
+        /**
+         * Given the Threading libraries max thread priority value, create a mapping to OS level thread
+         * priorities and place them in the provided vector.   This mapping will be used by the Threading
+         * library to map its values to the OS level values on calls to other methods like createNewThread
+         * and setPriority, etc.
+         *
+         * @param maxPriority
+         *      The maximum value that the Threading library uses for its priority range.
+         * @param mapping
+         *      A vector of int values that will be sized to maxPriority and maps the OS
+         *      priority values to the Threading libs range of priority values.
+         */
+        static void initPriorityMapping(int maxPriority, std::vector<int>& mapping);
+
+        static void createNewThread(decaf_thread_t* handle, threadMainMethod, void* threadArg,
+                                    int priority, long long stackSize, long long* threadId);
+
+        static void detachThread(decaf_thread_t handle);
+
+        static void joinThread(decaf_thread_t handle);
+
+        static void exitThread();
+
+        static decaf_thread_t getCurrentThread();
+
+        static long long getCurrentThreadId();
+
+        static int getPriority(decaf_thread_t thread);
+
+        static void setPriority(decaf_thread_t thread, int priority);
+
+        static long long getStackSize(decaf_thread_t thread);
+
+        static void setStackSize(decaf_thread_t thread, long long stackSize);
+
+        /**
+         * Pause the current thread allowing another thread to be scheduled for
+         * execution, no guarantee that this will happen.
+         */
+        static void yeild();
+
+    public:  // Thread Local Methods
+
+        static void createTlsKey(decaf_tls_key* key);
+
+        static void destroyTlsKey(decaf_tls_key key);
+
+        static void* getTlsValue(decaf_tls_key tlsKey);
+
+        static void setTlsValue(decaf_tls_key tlsKey, void* value);
+
+    };
+
+}}}}
+
+#endif /* _DECAF_INTERNAL_UTIL_CONCURRENT_PLATFORMTHREAD_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp?rev=1100383&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp Fri May  6 21:22:39 2011
@@ -0,0 +1,1451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Threading.h"
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/util/concurrent/Executors.h>
+
+#include <decaf/internal/util/concurrent/ThreadingTypes.h>
+#include <decaf/internal/util/concurrent/PlatformThread.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+using namespace decaf::internal;
+using namespace decaf::internal::util;
+using namespace decaf::internal::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class SuspendedCompletionCondition : public CompletionCondition {
+    private:
+
+        ThreadHandle* thread;
+
+    public:
+
+        SuspendedCompletionCondition(ThreadHandle* thread) : thread(thread) {
+        }
+
+        bool operator()() {
+            return !thread->suspended;
+        }
+    };
+
+    class MonitorWaitCompletionCondition : public CompletionCondition {
+    private:
+
+        ThreadHandle* handle;
+
+    public:
+
+        MonitorWaitCompletionCondition(ThreadHandle* handle) : handle(handle) {
+        }
+
+        virtual bool operator()(bool timedOut) {
+
+            PlatformThread::lockMutex(handle->mutex);
+
+            if (handle->notified || (handle->interruptible && handle->interrupted)) {
+                return true;
+            }
+
+            if (!timedOut) {
+                // Not timed out and not complete so unlock the thread so others can
+                // notify or interrupt it.
+                PlatformThread::unlockMutex(handle->mutex);
+            }
+
+            return false;
+        }
+
+        virtual bool operator()() {
+
+            PlatformThread::lockMutex(handle->mutex);
+
+            if (handle->notified || (handle->interruptible && handle->interrupted)) {
+                return true;
+            }
+
+            PlatformThread::unlockMutex(handle->mutex);
+            return false;
+        }
+    };
+
+    struct MonitorPool {
+        MonitorHandle* head;
+        unsigned int count;
+    };
+
+    struct ThreadingLibrary {
+        decaf_tls_key threadKey;
+        decaf_tls_key selfKey;
+        decaf_mutex_t globalLock;
+        decaf_mutex_t tlsLock;
+        std::vector<Thread*> osThreads;
+        decaf_thread_t mainThread;
+        std::vector<int> priorityMapping;
+        AtomicInteger osThreadId;
+        MonitorPool* monitors;
+    };
+
+    #define MONITOR_POOL_BLOCK_SIZE 64
+
+    ThreadingLibrary* library = NULL;
+
+    // ------------------------ Forward Declare All Utility Methds ----------------------- //
+    void unblockThreads(ThreadHandle* monitor);
+    void createThreadInstance(ThreadHandle* thread, long long stackSize, int priority,
+                              bool suspended, threadingTask threadMain, void* threadArg);
+    ThreadHandle* initThreadHandle(ThreadHandle* thread);
+    MonitorHandle* initMonitorHandle(MonitorHandle* monitor);
+    bool interruptWaitingThread(ThreadHandle* self, ThreadHandle* target);
+    void unblockThreads(ThreadHandle* queueHead);
+    void enqueueThread(ThreadHandle** queue, ThreadHandle* thread);
+    void dequeueThread(ThreadHandle** queue, ThreadHandle* thread);
+    unsigned int getNumberOfWaiters(MonitorHandle* monitor);
+    void purgeMonitorsPool(MonitorPool* pool);
+    MonitorHandle* batchAllocateMonitors();
+    void doMonitorExit(MonitorHandle* monitor, ThreadHandle* thread);
+    void doMonitorEnter(MonitorHandle* monitor, ThreadHandle* thread);
+    void doNotifyWaiters(MonitorHandle* monitor, bool notifyAll);
+    void doNotifyThread(ThreadHandle* thread, bool markAsNotified);
+    bool doWaitOnMonitor(MonitorHandle* monitor, ThreadHandle* thread, long long mills, int nanos, bool interruptible);
+    // ------------------------ Forward Declare All Utility Methds ----------------------- //
+
+    void threadExit(ThreadHandle* self, bool destroy = false) {
+
+        PlatformThread::lockMutex(library->globalLock);
+        PlatformThread::lockMutex(self->mutex);
+
+        self->state = Thread::TERMINATED;
+
+        // Must ensure that any interrupting threads get released
+        if (self->interruptingThread) {
+            PlatformThread::lockMutex(self->interruptingThread->mutex);
+            self->interruptingThread->canceled = true;
+            PlatformThread::unlockMutex(self->interruptingThread->mutex);
+            self->interruptingThread = NULL;
+        }
+
+        PlatformThread::notifyAll(self->condition);
+
+        unblockThreads(self->joiners);
+
+        PlatformThread::setTlsValue(library->threadKey, NULL);
+        PlatformThread::setTlsValue(library->selfKey, NULL);
+
+        // TODO tls_finalize (self);
+
+        PlatformThread::unlockMutex(self->mutex);
+        PlatformThread::unlockMutex(library->globalLock);
+
+        decaf_thread_t handle = self->handle;
+
+        if (destroy == true) {
+            free(self->name);
+            PlatformThread::destroyMutex(self->mutex);
+            PlatformThread::destroyCondition(self->condition);
+            delete self;
+        }
+
+        PlatformThread::detachThread(handle);
+        PlatformThread::exitThread();
+    }
+
+    PLATFORM_THREAD_CALLBACK_TYPE PLATFORM_CALLING_CONV threadEntryMethod(PLATFORM_THREAD_ENTRY_ARG arg) {
+
+        ThreadHandle* thread = (ThreadHandle*)arg;
+
+        PlatformThread::setTlsValue(library->threadKey, thread->parent);
+        PlatformThread::setTlsValue(library->selfKey, thread);
+
+        PlatformThread::lockMutex(thread->mutex);
+
+        if (thread->suspended == true) {
+            SuspendedCompletionCondition completion(thread);
+            PlatformThread::interruptibleWaitOnCondition(thread->condition, thread->mutex, completion);
+        }
+
+        PlatformThread::unlockMutex(thread->mutex);
+
+        if (thread->canceled == true) {
+            threadExit(thread);
+            PLATFORM_THREAD_RETURN()
+        }
+
+        thread->state = Thread::RUNNABLE;
+
+        thread->threadMain(thread->threadArg);
+
+        threadExit(thread);
+        PLATFORM_THREAD_RETURN()
+    }
+
+    void runCallback(void* arg) {
+
+        ThreadHandle* thread = (ThreadHandle*)arg;
+
+        // Invoke run on the task.
+        try{
+            thread->parent->run();
+        } catch( decaf::lang::Throwable& error ){
+
+            if (thread->parent->getUncaughtExceptionHandler() != NULL) {
+                thread->parent->getUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
+            } else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
+                thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
+            }
+
+        } catch(std::exception& stdEx) {
+
+            const RuntimeException error(__FILE__, __LINE__, stdEx.what());
+
+            if (thread->parent->getUncaughtExceptionHandler() != NULL) {
+                thread->parent->getUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
+            } else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
+                thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
+            }
+
+        } catch(...) {
+
+            const RuntimeException error(__FILE__, __LINE__,
+                                         "Uncaught exception bubbled up to Thread::run, Thread Terminating.");
+
+            if (thread->parent->getUncaughtExceptionHandler() != NULL) {
+                thread->parent->getUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
+            } else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
+                thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
+            }
+
+        }
+    }
+
+    void interruptionThread(void *arg) {
+
+        ThreadHandle* self = Threading::getCurrentThreadHandle();
+        ThreadHandle* target = (ThreadHandle*)arg;
+        MonitorHandle* monitor = NULL;
+
+        PlatformThread::lockMutex(library->globalLock);
+
+        // If the target was already canceled then we are done.
+        if (self->canceled == true) {
+            PlatformThread::unlockMutex(library->globalLock);
+            threadExit(self, true);
+        }
+
+        PlatformThread::lockMutex(target->mutex);
+
+        if (target->interruptingThread != self) {
+            PlatformThread::unlockMutex(target->mutex);
+            PlatformThread::unlockMutex(library->globalLock);
+            threadExit(self, true);
+        }
+
+        // This is the monitor the target is waiting on.
+        monitor = target->monitor;
+
+        PlatformThread::unlockMutex(target->mutex);
+        PlatformThread::unlockMutex(library->globalLock);
+
+        // try to take the monitor so that we can notify the thread to interrupt.
+        doMonitorEnter(monitor, self);
+
+        PlatformThread::lockMutex(library->globalLock);
+
+        // If the target was interrupted already it will cancel this thread.
+        if (self->canceled == true) {
+            PlatformThread::unlockMutex(library->globalLock);
+            threadExit(self, true);
+        }
+
+        PlatformThread::lockMutex(target->mutex);
+        if (target->interruptingThread == self && target->waiting == true) {
+            doNotifyThread(target, false);
+        }
+
+        target->interruptingThread = NULL;
+
+        PlatformThread::unlockMutex(target->mutex);
+        PlatformThread::unlockMutex(library->globalLock);
+
+        Threading::exitMonitor(monitor);
+
+        threadExit(self, true);
+    }
+
+    void createThreadInstance(ThreadHandle* thread, long long stackSize, int priority,
+                              bool suspended, threadingTask threadMain, void* threadArg) {
+
+        if (stackSize <= 0) {
+            stackSize = PLATFORM_DEFAULT_STACK_SIZE;
+        }
+
+        thread->stackSize = stackSize;
+        thread->priority = priority;
+        thread->suspended = suspended;
+        thread->threadMain = threadMain;
+        thread->threadArg = threadArg;
+
+        PlatformThread::createNewThread(&thread->handle, threadEntryMethod, thread,
+                                        library->priorityMapping[priority], stackSize,
+                                        &thread->threadId);
+    }
+
+    ThreadHandle* initThreadHandle(ThreadHandle* thread) {
+
+        thread->parent = NULL;
+        thread->name = NULL;
+        thread->interruptible = false;
+        thread->interrupted = false;
+        thread->parked = false;
+        thread->priority = Thread::NORM_PRIORITY;
+        thread->stackSize = -1;
+        thread->state = Thread::NEW;
+        thread->unparked = false;
+        thread->numAttached = 0;
+        thread->interruptingThread = NULL;
+        thread->osThread = false;
+        thread->handle = PlatformThread::getCurrentThread();
+        thread->threadId = 0;
+        thread->next = NULL;
+        thread->joiners = NULL;
+        thread->interruptingThread = NULL;
+        thread->monitor = NULL;
+
+        ::memset(thread->tls, 0, sizeof(thread->tls));
+
+        try{
+            PlatformThread::createMutex(&thread->mutex);
+        }
+        DECAF_CATCH_RETHROW( RuntimeException );
+
+        try{
+            PlatformThread::createCondition(&thread->condition);
+        } catch(RuntimeException& ex) {
+            PlatformThread::destroyMutex(thread->mutex);
+            throw ex;
+        }
+
+        return thread;
+    }
+
+    MonitorHandle* initMonitorHandle(MonitorHandle* monitor) {
+        monitor->owner = NULL;
+        monitor->count = 0;
+        monitor->blocking = NULL;
+        monitor->waiting = NULL;
+        monitor->next = NULL;
+        return monitor;
+    }
+
+    bool interruptWaitingThread(ThreadHandle* self DECAF_UNUSED, ThreadHandle* target) {
+
+        bool result = false;
+        //MonitorHandle* monitor;
+
+        // TODO - Currently gets into a deadlock.
+        // If this thread owns the target thread's monitor lock then there's no
+        // reason to spawn an interruption thread, otherwise it has to be done
+        // asynchronously.
+        //monitor = target->monitor;
+        //    if (monitorTryEnterUsingThreadId(monitor, self) == true) {
+        //        PlatformThread::notifyAll(target->condition);
+        //        monitorExitUsingThreadId(monitor, self);
+        //        result = true;
+        //    } else {
+
+        // Spawn the thread so that we don't deadlock on the monitor.
+        target->interruptingThread = initThreadHandle(new ThreadHandle());
+        createThreadInstance(target->interruptingThread, 0, Thread::NORM_PRIORITY, false, interruptionThread, target);
+        //    }
+
+        return result;
+    }
+
+    void unblockThreads(ThreadHandle* queueHead) {
+        ThreadHandle* current = NULL;
+        ThreadHandle* next = NULL;
+
+        next = queueHead;
+
+        while(next != NULL) {
+            current = next;
+            next = current->next;
+            PlatformThread::notifyAll(current->condition);
+        }
+    }
+
+    void enqueueThread(ThreadHandle** queue, ThreadHandle* thread)
+    {
+        ThreadHandle* qThread = *queue;
+
+        if (thread->next != NULL) {
+            throw RuntimeException(__FILE__, __LINE__, "Thread was on a monitor queue already");
+        }
+
+        if (qThread != NULL) {
+            while(qThread->next) {
+                qThread = qThread->next;
+            }
+            qThread->next = thread;
+        } else {
+            *queue = thread;
+        }
+    }
+
+    void dequeueThread(ThreadHandle** queue, ThreadHandle* thread) {
+
+        ThreadHandle* current = NULL;
+        ThreadHandle* next = NULL;
+
+        if ((current = *queue) == NULL) {
+            return;
+        }
+
+        if (current == thread) {
+            *queue = thread->next;
+            thread->next = NULL;
+        } else {
+
+            while((next = current->next) != NULL && next != thread) {
+                current = next;
+            }
+
+            if (next != NULL) {
+                current->next = thread->next;
+                thread->next = NULL;
+            }
+        }
+    }
+
+    unsigned int getNumberOfWaiters(MonitorHandle* monitor) {
+
+        unsigned int numWaiting = 0;
+        ThreadHandle* current;
+
+        PlatformThread::lockMutex(monitor->mutex);
+
+        current = monitor->waiting;
+        while (current != NULL) {
+            numWaiting++;
+            current = current->next;
+        }
+
+        PlatformThread::unlockMutex(monitor->mutex);
+
+        return numWaiting;
+    }
+
+    void purgeMonitorsPool(MonitorPool* pool) {
+
+        MonitorHandle* current = NULL;
+        MonitorHandle* next = NULL;
+
+        next = pool->head;
+
+        while(next != NULL) {
+            current = next;
+            next = current->next;
+
+            // Cleanup the OS level resources.
+            if (current->initialized == true) {
+                PlatformThread::destroyMutex(current->mutex);
+                PlatformThread::destroyMutex(current->lock);
+            }
+
+            delete current;
+        }
+    }
+
+    MonitorHandle* batchAllocateMonitors() {
+
+        MonitorHandle* current = NULL;
+        MonitorHandle* last = NULL;
+
+        for (int i = 0; i < MONITOR_POOL_BLOCK_SIZE; ++i) {
+            current = new MonitorHandle;
+            initMonitorHandle(current);
+            current->next = last;
+            current->initialized = false;
+            last = current;
+        }
+
+        return current;
+    }
+
+    void doNotifyThread(ThreadHandle* thread, bool markAsNotified) {
+
+        thread->waiting = false;
+        thread->blocked = true;
+
+        if (markAsNotified) {
+            thread->notified = true;
+        }
+
+        PlatformThread::notifyAll(thread->condition);
+    }
+
+    void doNotifyWaiters(MonitorHandle* monitor, bool notifyAll) {
+
+        ThreadHandle* self = Threading::getCurrentThreadHandle();
+        ThreadHandle* current = NULL;
+        ThreadHandle* next = NULL;
+        bool signalled = false;
+
+        if (self != monitor->owner) {
+            throw IllegalMonitorStateException(__FILE__, __LINE__, "Current Thread is not the lock holder.");
+        }
+
+        PlatformThread::lockMutex(monitor->mutex);
+
+        next = monitor->waiting;
+
+        while(next != NULL) {
+            current = next;
+            next = current->next;
+
+            PlatformThread::lockMutex(current->mutex);
+
+            if (current->waiting == true) {
+                doNotifyThread(current, true);
+                signalled = true;
+            }
+
+            PlatformThread::unlockMutex(current->mutex);
+
+            if (signalled && !notifyAll) {
+                break;
+            }
+        }
+
+        PlatformThread::unlockMutex(monitor->mutex);
+    }
+
+    void doMonitorEnter(MonitorHandle* monitor, ThreadHandle* thread) {
+
+        while(true) {
+
+            if (PlatformThread::tryLockMutex(monitor->lock) == true) {
+                monitor->owner = thread;
+                monitor->count = 1;
+                break;
+            }
+
+            PlatformThread::lockMutex(monitor->mutex);
+
+            if (PlatformThread::tryLockMutex(monitor->lock) == true) {
+                PlatformThread::unlockMutex(monitor->mutex);
+                monitor->owner = thread;
+                monitor->count = 1;
+                break;
+            }
+
+            PlatformThread::lockMutex(thread->mutex);
+
+            thread->blocked = true;
+            thread->state = Thread::BLOCKED;
+            thread->monitor = monitor;
+
+            PlatformThread::unlockMutex(thread->mutex);
+
+            enqueueThread(&monitor->blocking, thread);
+
+            PlatformThread::waitOnCondition(thread->condition, monitor->mutex);
+
+            dequeueThread(&monitor->blocking, thread);
+
+            PlatformThread::unlockMutex(monitor->mutex);
+        }
+
+        // Monitor is now owned by this thread, lets clean up the state in case
+        // the lock was acquired after blocking.
+        if (thread->monitor != NULL) {
+            PlatformThread::lockMutex(thread->mutex);
+            thread->blocked = false;
+            thread->state = Thread::RUNNABLE;
+            thread->monitor = NULL;
+            PlatformThread::unlockMutex(thread->mutex);
+        }
+    }
+
+    void doMonitorExit(MonitorHandle* monitor, ThreadHandle* thread DECAF_UNUSED) {
+
+        monitor->count--;
+
+        if (monitor->count == 0) {
+            monitor->owner = NULL;
+
+            // Wake any blocked threads so they can attempt to enter the monitor.
+            PlatformThread::lockMutex(monitor->mutex);
+            unblockThreads(monitor->blocking);
+
+            // since we are signaling waiting threads we unlock this under lock so that they
+            // don't go back to sleep before we are done
+            PlatformThread::unlockMutex(monitor->lock);
+
+            PlatformThread::unlockMutex(monitor->mutex);
+        }
+    }
+
+    bool doWaitOnMonitor(MonitorHandle* monitor, ThreadHandle* thread,
+                         long long mills, int nanos, bool interruptible) {
+
+        int count = -1;
+        bool interrupted = false;
+        bool notified = false;
+        bool timedOut = false;
+
+        if (monitor->owner != thread) {
+            throw IllegalMonitorStateException(__FILE__, __LINE__, "Current Thread is not the lock holder.");
+        }
+
+        count = monitor->count;
+
+        PlatformThread::lockMutex(thread->mutex);
+
+        // Before we wait, check if we've already been either interrupted
+        if (interruptible && thread->interrupted) {
+            thread->interrupted = false;
+            PlatformThread::unlockMutex(thread->mutex);
+            throw InterruptedException(__FILE__, __LINE__, "Thread interrupted");
+        }
+
+        thread->waiting = true;
+        thread->interruptible = interruptible;
+
+        if (mills || nanos) {
+            thread->timerSet = true;
+            thread->state = Thread::TIMED_WAITING;
+        } else {
+            thread->state = Thread::WAITING;
+        }
+
+        thread->monitor = monitor;
+        PlatformThread::unlockMutex(thread->mutex);
+
+        monitor->owner = NULL;
+        monitor->count = 0;
+
+        PlatformThread::lockMutex(monitor->mutex);
+
+        // Release the lock and wake up any blocked threads.
+        PlatformThread::unlockMutex(monitor->lock);
+        unblockThreads(monitor->blocking);
+
+        // This thread now enters the wait queue.
+        enqueueThread(&monitor->waiting, thread);
+
+        MonitorWaitCompletionCondition completion(thread);
+
+        if (mills || nanos) {
+            timedOut = PlatformThread::interruptibleWaitOnCondition(thread->condition, monitor->mutex, mills, nanos, completion);
+        } else {
+            PlatformThread::interruptibleWaitOnCondition(thread->condition, monitor->mutex, completion);
+        }
+
+        dequeueThread(&monitor->waiting, thread);
+
+        PlatformThread::unlockMutex(monitor->mutex);
+
+        // We should own the Thread's mutex from the CompletionCondition locking it.
+
+        interrupted = thread->interrupted;
+        notified = thread->notified;
+
+        thread->waiting = false;
+        thread->notified = false;
+        thread->timerSet = false;
+        thread->interruptible = false;
+        thread->state = Thread::RUNNABLE;
+
+        if (interrupted && !notified) {
+            thread->interrupted = false;
+        }
+
+        PlatformThread::lockMutex(library->globalLock);
+        if (thread->interruptingThread) {
+            PlatformThread::lockMutex(thread->interruptingThread->mutex);
+            thread->interruptingThread->canceled = true;
+            PlatformThread::unlockMutex(thread->interruptingThread->mutex);
+            thread->interruptingThread = NULL;
+        }
+        PlatformThread::unlockMutex(library->globalLock);
+
+        PlatformThread::unlockMutex(thread->mutex);
+
+        // Re-acquire the lock now and restore its old state.
+        doMonitorEnter(monitor, thread);
+
+        monitor->count = count;
+
+        if (notified) {
+            return false;
+        }
+
+        if (interrupted) {
+            throw InterruptedException(__FILE__, __LINE__, "Thread interrupted");
+        }
+
+        if (!timedOut) {
+            throw RuntimeException(__FILE__, __LINE__, "Invalid state detected at end of Monitor Wait");
+        }
+
+        return true;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Threading::Threading() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::initialize() {
+
+    library = new ThreadingLibrary();
+
+    // Figure out what the OS level thread priority mappings are for the Thread
+    // classes generic priority value range.
+    PlatformThread::initPriorityMapping(Thread::MAX_PRIORITY + 1, library->priorityMapping);
+
+    PlatformThread::createTlsKey(&(library->threadKey));
+    PlatformThread::createTlsKey(&(library->selfKey));
+    PlatformThread::createMutex(&(library->globalLock));
+    PlatformThread::createMutex(&(library->tlsLock));
+
+    library->monitors = new MonitorPool;
+    library->monitors->head = batchAllocateMonitors();
+    library->monitors->count = MONITOR_POOL_BLOCK_SIZE;
+
+    // We mark the thread where Decaf's Init routine is called from as our Main Thread.
+    library->mainThread = PlatformThread::getCurrentThread();
+
+    // Initialize the Executors static data for use in ExecutorService classes.
+    Executors::initialize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::shutdown() {
+
+    // First shutdown the Executors static data to remove dependencies on Threading.
+    Executors::shutdown();
+
+    // Destroy any Foreign Thread Facades that were created during runtime.
+    std::vector<Thread*>::iterator iter = library->osThreads.begin();
+    for( ; iter != library->osThreads.end(); ++iter ) {
+        delete *iter;
+    }
+    library->osThreads.clear();
+
+    PlatformThread::destroyTlsKey(library->threadKey);
+    PlatformThread::destroyTlsKey(library->selfKey);
+    PlatformThread::destroyMutex(library->globalLock);
+    PlatformThread::destroyMutex(library->tlsLock);
+
+    purgeMonitorsPool(library->monitors);
+    delete library->monitors;
+
+    delete library;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::lockThreadsLib() {
+    PlatformThread::lockMutex(library->globalLock);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::unlockThreadsLib() {
+    PlatformThread::unlockMutex(library->globalLock);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadHandle* Threading::createNewThread(Thread* parent, const char* name, long long stackSize) {
+
+    if(parent == NULL || name == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "One or more arguments was NULL");
+    }
+
+    Pointer<ThreadHandle> thread(initThreadHandle(new ThreadHandle()));
+
+    thread->parent = parent;
+    thread->name = ::strdup(name);
+
+    createThreadInstance(thread.get(), stackSize, Thread::NORM_PRIORITY, true, runCallback, thread.get());
+
+    return thread.release();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::destroyThread(ThreadHandle* thread) {
+
+    free(thread->name);
+
+    if (!thread->osThread) {
+        Threading::join(thread, 0, 0);
+    }
+    PlatformThread::destroyMutex(thread->mutex);
+    PlatformThread::destroyCondition(thread->condition);
+
+    delete thread;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadHandle* Threading::attachToCurrentThread() {
+
+    Pointer<ThreadHandle> thread(initThreadHandle(new ThreadHandle()));
+
+    thread->handle = PlatformThread::getCurrentThread();
+    thread->state = Thread::RUNNABLE;
+    thread->stackSize = PlatformThread::getStackSize(thread->handle);
+    thread->name = ::strdup(
+        std::string(std::string("OS-Thread") + Integer::toString(library->osThreadId.getAndIncrement())).c_str());
+    thread->threadId = PlatformThread::getCurrentThreadId();
+
+    // Now create a Decaf Thread as a proxy to the OS thread.
+    Pointer<Thread> osThread(new Thread(thread.get()));
+    thread->parent = osThread.get();
+    thread->osThread = true;
+
+    PlatformThread::setTlsValue(library->threadKey, osThread.get());
+    PlatformThread::setTlsValue(library->selfKey, thread.get());
+
+    // Store the Thread that wraps this OS thread for later deletion since
+    // no other owners exist.
+    library->osThreads.push_back(osThread.release());
+
+    return thread.release();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::start(ThreadHandle* thread) {
+
+    try {
+
+        if (thread->state > Thread::NEW) {
+            throw IllegalThreadStateException(
+                __FILE__, __LINE__, "Thread already started");
+        }
+
+        PlatformThread::lockMutex(thread->mutex);
+
+        thread->state = Thread::RUNNABLE;
+
+        if (thread->suspended == true) {
+            thread->suspended = false;
+            PlatformThread::notifyAll(thread->condition);
+        }
+
+        PlatformThread::unlockMutex(thread->mutex);
+     }
+     DECAF_CATCH_RETHROW( IllegalThreadStateException )
+     DECAF_CATCH_RETHROW( RuntimeException )
+     DECAF_CATCH_EXCEPTION_CONVERT( NullPointerException, RuntimeException )
+     DECAF_CATCHALL_THROW( RuntimeException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::yeild() {
+    PlatformThread::yeild();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::interrupt(ThreadHandle* thread) {
+
+    ThreadHandle* self = Threading::getCurrentThreadHandle();
+
+    PlatformThread::lockMutex(library->globalLock);
+    PlatformThread::lockMutex(thread->mutex);
+
+    if (thread->interrupted == true) {
+        PlatformThread::unlockMutex(thread->mutex);
+        PlatformThread::unlockMutex(library->globalLock);
+        return;
+    }
+
+    if (thread->interruptible == true) {
+
+        if (thread->sleeping || thread->parked) {
+            PlatformThread::notifyAll(thread->condition);
+        } else if(thread->waiting == true) {
+            if (interruptWaitingThread(self, thread)) {
+                thread->blocked = true;
+            }
+        }
+    }
+
+    thread->interrupted = true;
+
+    PlatformThread::unlockMutex(thread->mutex);
+    PlatformThread::unlockMutex(library->globalLock);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::interrupted() {
+    ThreadHandle* self = Threading::getCurrentThreadHandle();
+    return Threading::isInterrupted(self, true);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::isInterrupted(ThreadHandle* handle, bool reset) {
+
+    bool currentValue = handle->interrupted;
+
+    if (reset == true) {
+        PlatformThread::lockMutex(handle->mutex);
+        currentValue = handle->interrupted;
+        handle->interrupted = false;
+        PlatformThread::unlockMutex(handle->mutex);
+    }
+
+    return currentValue;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class JoinCompletionCondition : public CompletionCondition {
+    private:
+
+        ThreadHandle* self;
+        ThreadHandle* target;
+
+    public:
+
+        JoinCompletionCondition(ThreadHandle* self, ThreadHandle* target) : self(self), target(target) {
+        }
+
+        virtual bool operator()() {
+
+            if (target != NULL) {
+                if (target->state == Thread::TERMINATED) {
+                    return true;
+                }
+
+                PlatformThread::lockMutex(self->mutex);
+
+                if (self->interrupted == true) {
+                    PlatformThread::unlockMutex(self->mutex);
+                    return true;
+                }
+
+                PlatformThread::unlockMutex(self->mutex);
+            } else if (self->interrupted == true) {
+                return true;
+            }
+
+            return false;
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::join(ThreadHandle* thread, long long mills, int nanos) {
+
+    if ((mills < 0) || (nanos < 0) || (nanos >= 1000000)) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Timeout arguments out of range.");
+    }
+
+    bool timedOut = false;
+    bool interrupted = false;
+
+    ThreadHandle* self = getCurrentThreadHandle();
+
+    PlatformThread::lockMutex(self->mutex);
+
+    if (self->interrupted == true) {
+        interrupted = true;
+    } else if (self == thread && self->state != Thread::TERMINATED) {
+
+        // When blocking on ourself, we just enter a wait and hope their's
+        // either a timeout, or we interrupted.
+
+        JoinCompletionCondition completion(self, NULL);
+
+        self->sleeping = true;
+        self->interruptible = true;
+        self->state = Thread::SLEEPING;
+
+        if (mills > 0 || nanos > 0) {
+            self->timerSet = true;
+            timedOut = PlatformThread::interruptibleWaitOnCondition(self->condition, self->mutex,
+                                                                    mills, nanos, completion);
+        } else {
+            PlatformThread::interruptibleWaitOnCondition(self->condition, self->mutex, completion);
+        }
+
+    } else {
+
+        PlatformThread::lockMutex(thread->mutex);
+
+        if (thread->state >= Thread::RUNNABLE && thread->state != Thread::TERMINATED) {
+
+            enqueueThread(&thread->joiners, self);
+
+            self->sleeping = true;
+            self->interruptible = true;
+            self->state = Thread::SLEEPING;
+
+            JoinCompletionCondition completion(self, thread);
+
+            if (mills > 0 || nanos > 0) {
+                self->timerSet = true;
+
+                PlatformThread::unlockMutex(self->mutex);
+                timedOut = PlatformThread::interruptibleWaitOnCondition(self->condition, thread->mutex,
+                                                                        mills, nanos, completion);
+            } else {
+                PlatformThread::unlockMutex(self->mutex);
+                PlatformThread::interruptibleWaitOnCondition(self->condition, thread->mutex, completion);
+            }
+
+            dequeueThread(&thread->joiners, self);
+            PlatformThread::unlockMutex(thread->mutex);
+
+            PlatformThread::lockMutex(self->mutex);
+
+            self->timerSet = false;
+            self->state = Thread::RUNNABLE;
+            self->sleeping = false;
+            self->interruptible = false;
+
+            if (self->interrupted == true) {
+                interrupted = true;
+                self->interrupted = false;
+            }
+        } else {
+            PlatformThread::unlockMutex(thread->mutex);
+        }
+    }
+
+    PlatformThread::unlockMutex(self->mutex);
+
+    if (interrupted) {
+        throw InterruptedException(__FILE__, __LINE__, "Sleeping Thread interrupted");
+    }
+
+    return timedOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class SleepCompletionCondition : public CompletionCondition {
+    private:
+
+        ThreadHandle* handle;
+
+    public:
+
+        SleepCompletionCondition(ThreadHandle* handle) : handle(handle) {
+        }
+
+        bool operator()() {
+            if (handle->interrupted) {
+                return true;
+            }
+
+            return false;
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::sleep(long long mills, int nanos) {
+
+    if ((mills < 0) || (nanos < 0) || (nanos >= 1000000)) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Timeout arguments out of range.");
+    }
+
+    bool timedOut = false;
+    bool interrupted = false;
+
+    ThreadHandle* self = getCurrentThreadHandle();
+
+    PlatformThread::lockMutex(self->mutex);
+
+    if (self->interrupted == true) {
+        interrupted = true;
+    } else {
+
+        self->sleeping = true;
+        self->state = Thread::SLEEPING;
+        self->interruptible = true;
+        self->timerSet = true;
+
+        SleepCompletionCondition completion(self);
+
+        timedOut = PlatformThread::interruptibleWaitOnCondition(self->condition, self->mutex,
+                                                                mills, nanos, completion);
+    }
+
+    self->timerSet = false;
+    self->sleeping = false;
+    self->interruptible = false;
+    self->state = Thread::RUNNABLE;
+
+    if (self->interrupted == true) {
+        interrupted = true;
+        self->interrupted = false;
+    }
+
+    PlatformThread::unlockMutex(self->mutex);
+
+    if (interrupted) {
+        throw InterruptedException(__FILE__, __LINE__, "Sleeping Thread interrupted");
+    }
+
+    return timedOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::isThreadAlive(ThreadHandle* handle DECAF_UNUSED) {
+    return handle->state >= Thread::RUNNABLE && handle->state != Thread::TERMINATED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long Threading::getThreadId(ThreadHandle* handle) {
+    return handle->threadId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Threading::getThreadPriority(ThreadHandle* handle) {
+    return handle->priority;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::setThreadPriority(ThreadHandle* handle, int priority) {
+    PlatformThread::setPriority(handle->handle, library->priorityMapping[priority]);
+    handle->priority = priority;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const char* Threading::getThreadName(ThreadHandle* handle) {
+    return handle->name;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::setThreadName(ThreadHandle* thread, const char* name) {
+
+    if (thread->name != NULL) {
+        free(thread->name);
+    }
+
+    thread->name = ::strdup(name);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::State Threading::getThreadState(ThreadHandle* handle) {
+    return (Thread::State)handle->state;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread* Threading::getCurrentThread() {
+    return getCurrentThreadHandle()->parent;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadHandle* Threading::getCurrentThreadHandle() {
+    ThreadHandle* self = (ThreadHandle*)PlatformThread::getTlsValue(library->selfKey);
+
+    if (self == NULL) {
+        self = attachToCurrentThread();
+    }
+
+    return self;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::park(Thread* thread) {
+
+    if( thread == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Null Thread Pointer Passed." );
+    }
+
+    Threading::park(thread, 0LL, 0LL);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class ParkCompletionCondition : public CompletionCondition {
+    private:
+
+        ThreadHandle* handle;
+
+    public:
+
+        ParkCompletionCondition(ThreadHandle* handle) : handle(handle) {
+        }
+
+        bool operator()() {
+            if (handle->unparked == true) {
+                handle->unparked = false;
+                return true;
+            } else if (handle->interrupted) {
+                return true;
+            }
+
+            return false;
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::park( Thread* thread, long long mills, int nanos) {
+
+    if( thread == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Null Thread Pointer Passed." );
+    }
+
+    bool timedOut = false;
+    bool interrupted = false;
+
+    ThreadHandle* handle = thread->getHandle();
+
+    PlatformThread::lockMutex(handle->mutex);
+
+    if (handle->unparked == true) {
+        handle->unparked = false;
+    } else if (handle->interrupted == true) {
+        interrupted = true;
+    } else {
+
+        handle->state = Thread::BLOCKED;
+        handle->parked = true;
+        handle->interruptible = true;
+
+        ParkCompletionCondition completion(handle);
+
+        if (mills > 0 || nanos > 0) {
+            handle->timerSet = true;
+            timedOut = PlatformThread::interruptibleWaitOnCondition(handle->condition, handle->mutex,
+                                                                    mills, nanos, completion);
+        } else {
+            PlatformThread::interruptibleWaitOnCondition(handle->condition, handle->mutex, completion);
+        }
+    }
+
+    handle->timerSet = false;
+    handle->parked = false;
+    handle->interruptible = false;
+    handle->state = Thread::RUNNABLE;
+
+    if (handle->interrupted == true) {
+        interrupted = true;
+        handle->interrupted = false;
+    }
+
+    PlatformThread::unlockMutex(handle->mutex);
+
+    if (interrupted) {
+        throw InterruptedException(__FILE__, __LINE__, "Parked Thread interrupted");
+    }
+
+    return timedOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::unpark(Thread* thread) {
+
+    if( thread == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Null Thread Pointer Passed." );
+    }
+
+    ThreadHandle* handle = thread->getHandle();
+
+    PlatformThread::lockMutex(handle->mutex);
+
+    // Set the un-parked token, if the thread is parked it will consume
+    // it when it resumes, otherwise the next call to park will consume
+    // it without needing to actually wait.
+    handle->unparked = true;
+
+    // If the thread is actually parked then we send it a signal so
+    // that it will resume.
+    if (handle->parked) {
+        PlatformThread::notifyAll(handle->condition);
+    }
+
+    PlatformThread::unlockMutex(handle->mutex);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MonitorHandle* Threading::takeMonitor(bool alreadyLocked) {
+
+    MonitorHandle* monitor = NULL;
+
+    if (!alreadyLocked) {
+        PlatformThread::lockMutex(library->globalLock);
+    }
+
+    if (library->monitors->head == NULL) {
+        library->monitors->head = batchAllocateMonitors();
+        library->monitors->count = MONITOR_POOL_BLOCK_SIZE;
+    }
+
+    monitor = library->monitors->head;
+    library->monitors->head = monitor->next;
+    library->monitors->count--;
+    monitor->next = NULL;
+
+    if (monitor->initialized == false) {
+        PlatformThread::createMutex(&monitor->mutex);
+        PlatformThread::createMutex(&monitor->lock);
+        monitor->initialized = true;
+    }
+
+    if (!alreadyLocked) {
+        PlatformThread::unlockMutex(library->globalLock);
+    }
+
+    return monitor;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::returnMonitor(MonitorHandle* monitor, bool alreadyLocked) {
+
+    if (monitor == NULL) {
+        throw RuntimeException(__FILE__, __LINE__, "Monitor pointer was null");
+    }
+
+    // The own can return the Monitor in a locked state if its held by the thread that's
+    // returning it, we will unlock it for the caller in this case, otherwise we throw
+    // an error, and in every case we need to thrown if someone's still waiting on the
+    // target monitor otherwise we could see a segfault.
+    if ((monitor->owner && monitor->owner != getCurrentThreadHandle()) || monitor->waiting) {
+        throw IllegalMonitorStateException(__FILE__, __LINE__, "Monitor is still in use!");
+    }
+
+    if (monitor->owner) {
+        Threading::exitMonitor(monitor);
+    }
+
+    if (!alreadyLocked) {
+        PlatformThread::lockMutex(library->globalLock);
+    }
+
+    initMonitorHandle(monitor);
+    monitor->next = library->monitors->head;
+    library->monitors->head = monitor;
+    library->monitors->count++;
+
+    if (!alreadyLocked) {
+        PlatformThread::unlockMutex(library->globalLock);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::enterMonitor(MonitorHandle* monitor) {
+
+    ThreadHandle* thisThread = getCurrentThreadHandle();
+
+    if (thisThread == monitor->owner) {
+        monitor->count++;
+        return;
+    }
+
+    doMonitorEnter(monitor, thisThread);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::tryEnterMonitor(MonitorHandle* monitor) {
+
+    ThreadHandle* self = getCurrentThreadHandle();
+
+    return monitorTryEnterUsingThreadId(monitor, self);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::exitMonitor(MonitorHandle* monitor) {
+
+    ThreadHandle* thisThread = getCurrentThreadHandle();
+
+    if (thisThread != monitor->owner) {
+        throw IllegalMonitorStateException(__FILE__, __LINE__, "Thread is not the owner of this monitor");
+    }
+
+    doMonitorExit(monitor, thisThread);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::waitOnMonitor(MonitorHandle* monitor, long long mills, int nanos) {
+
+    ThreadHandle* self = getCurrentThreadHandle();
+
+    // Wait but do so in a non-interruptible state.
+    return doWaitOnMonitor(monitor, self, mills, nanos, true);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::notifyWaiter(MonitorHandle* monitor) {
+    doNotifyWaiters(monitor, false);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::notifyAllWaiters(MonitorHandle* monitor) {
+    doNotifyWaiters(monitor, true);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::monitorExitUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread) {
+
+    if (monitor->owner != thread) {
+        throw IllegalMonitorStateException(__FILE__, __LINE__, "Specified thread is not the monitor owner.");
+    }
+
+    doMonitorExit(monitor, thread);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Threading::monitorEnterUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread) {
+
+    if (monitor->owner == thread) {
+        monitor->count++;
+        return;
+    }
+
+    doMonitorEnter(monitor, thread);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Threading::monitorTryEnterUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread) {
+
+    if (monitor->owner == thread) {
+        monitor->count++;
+        return true;
+    }
+
+    if (PlatformThread::tryLockMutex(monitor->lock) == true) {
+        monitor->owner = thread;
+        monitor->count = 1;
+        return true;
+    }
+
+    return false;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.h?rev=1100383&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.h Fri May  6 21:22:39 2011
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_INTERNAL_UTIL_CONCURRENT_THREADING_H_
+#define _DECAF_INTERNAL_UTIL_CONCURRENT_THREADING_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Thread.h>
+
+namespace decaf {
+namespace internal {
+namespace util {
+namespace concurrent {
+
+    using decaf::lang::Thread;
+
+    struct ThreadHandle;
+    struct MonitorHandle;
+
+    class DECAF_API Threading {
+    private:
+
+        Threading();
+        Threading(const Threading&);
+        Threading& operator= (const Threading&);
+
+    public:
+
+        /**
+         * Called by the Decaf Runtime at startup to allow the Platform Threading
+         * code to initialize any necessary Threading constructs needed to support
+         * the features of this class.
+         */
+        static void initialize();
+
+        /**
+         * Called by the Decaf Runtime at Shutdown to allow the Platform Threading
+         * code to return any resources that were allocated at startup for the
+         * Threading library.
+         */
+        static void shutdown();
+
+        /**
+         * Locks the Threading library allowing an object to perform some operations safely in
+         * a multi-threaded environment.
+         */
+        static void lockThreadsLib();
+
+        /**
+         * Unlocks the Threading library when locked.
+         */
+        static void unlockThreadsLib();
+
+    public:  // Monitors
+
+        /**
+         * Gets a monitor for use as a locking mechanism.  The monitor returned will be
+         * initialized and ready for use.  Each monitor that is taken must be returned before
+         * the Threading library is shutdown.
+         *
+         * @returns handle to a Monitor instance that has been initialized.
+         */
+        static MonitorHandle* takeMonitor(bool alreadyLocked = false);
+
+        /**
+         * Returns a given monitor to the Monitor pool after the Monitor is no longer needed.
+         *
+         * @param monitor
+         *      The handle of the Monitor to return to the Monitor pool.
+         *
+         * @throws IllegalMonitorStateException if the monitor is in use when returned.
+         */
+        static void returnMonitor(MonitorHandle* monitor, bool alreadyLocked = false);
+
+        /**
+         * Monitor locking method.  The calling thread blocks until it acquires the
+         * monitor.  A thread can enter the same monitor more than once, but must then
+         * exit the monitor the same number of times.
+         *
+         * @param monitor
+         *      The handle to the monitor that the current thread is attempting to lock.
+         */
+        static void enterMonitor(MonitorHandle* monitor);
+
+        /**
+         * Monitor locking method.  If the calling thread cannot immediately acquire the lock
+         * on the monitor then this method returns false, otherwise the thread gains the lock
+         * on the monitor and the method returns true.  A thread can enter a monitor multiple
+         * times, but must ensure that it exits the monitor the same number of times that it
+         * entered it.
+         *
+         * @param monitor
+         *      The handle to the monitor that the current thread is attempting to lock.
+         *
+         * @returns true if the caller obtains the lock on the Monitor, false otherwise.
+         */
+        static bool tryEnterMonitor(MonitorHandle* monitor);
+
+        /**
+         * Exit the acquired monitor giving up the lock that is held and allowing other threads
+         * to acquire the monitor.  If the calling thread has entered the monitor more than once
+         * then it must exit that monitor the same number of times.
+         *
+         * @param monitor
+         *      Handle to the monitor instance that is to be excited.
+         *
+         * @throws IllegalMonitorStateException if the caller is not the owner of the monitor.
+         */
+        static void exitMonitor(MonitorHandle* monitor);
+
+        /**
+         * Waits on a monitor to be signaled by another thread.  The caller can wait for a
+         * given timeout or pass zero for both mills and nanos to indicate it wants to wait
+         * forever.  If the caller specifies a timeout and that timeout expires before the
+         * monitor is signaled this method returns true.  The calling thread must own the
+         * monitor in order to call this method, otherwise an IllegalMonitorStateException
+         * is thrown.
+         *
+         * @param monitor
+         *      Handle to the monitor that the calling thread is to wait on for a signal.
+         * @param mills
+         *      The time in milliseconds to wait for the monitor to be signaled.
+         * @param nanos
+         *      The time in nanoseconds to wait for the monitor to be signaled.
+         *
+         * @returns true if the timeout given expires before the caller was signaled.
+         *
+         * @throws IllegalMonitorStateException if the caller does not own the monitor.
+         */
+        static bool waitOnMonitor(MonitorHandle* monitor, long long mills, int nanos);
+
+        /**
+         * Notify a single waiter on the given Monitor instance, if there is no thread currently
+         * waiting on the specified monitor then no action is taken.  The calling thread must own
+         * the given monitor otherwise an IllegalMonitorStateException is thrown.
+         *
+         * @param monitor
+         *      The monitor handle that is to have a single waiting thread signaled.
+         *
+         * @throws IllegalMonitorStateException if the caller does not own the monitor.
+         */
+        static void notifyWaiter(MonitorHandle* monitor);
+
+        /**
+         * Notifies all waiting threads for the given Monitor.  If there are no threads currently
+         * waiting on the given monitor instance then no action is taken.  The calling thread must
+         * own the given monitor otherwise an IllegalMonitorStateException is thrown.
+         *
+         * @param monitor
+         *      The monitor handle that is to have all of its waiting thread signaled.
+         *
+         * @throws IllegalMonitorStateException if the caller does not own the monitor.
+         */
+        static void notifyAllWaiters(MonitorHandle* monitor);
+
+    public:  // Threads
+
+        /**
+         * Creates a new thread instance with the given Thread object as its
+         * parent, assigning it the given name and stack size.  The Thread class
+         * provides its own main Runnable for executing task.
+         *
+         * @param parent
+         *      The parent Thread object that the new thread is owned by.
+         * @param name
+         *      Name given to the new Thread, used for debugging purposes.
+         * @param stackSize
+         *      The size to allocate for the new thread's stack.
+         *
+         * @returns a new ThreadHandle that identifies the thread and allows the parent
+         *          to interact with it.
+         */
+        static ThreadHandle* createNewThread(Thread* parant, const char* name,
+                                             long long stackSize);
+
+        /**
+         * Starts the given thread running, if the thread is already running then this
+         * method has no effect.
+         *
+         * @param thread
+         *      The thread instance to start.
+         */
+        static void start(ThreadHandle* thread);
+
+        /**
+         * Joins the given thread instance and waits for it to either terminate or for the
+         * given timeout period to expire.  If the value of of the timeout is zero then this
+         * method waits forever.
+         *
+         * @param thread
+         *      The target thread to join.
+         * @param mills
+         *      The number of milliseconds to wait.
+         * @param nanos
+         *      The number of nanoseconds to wait [0-999999].
+         *
+         * @returns true if the timeout period expired, false otherwise.
+         *
+         * @throws InterruptedException if the Join was interrupted.
+         * @throws IllegalArgumentException if the value of mills or nanos is invalid.
+         */
+        static bool join(ThreadHandle* thread, long long mills, int nanos);
+
+        static void interrupt(ThreadHandle* thread);
+        static bool interrupted();
+        static bool isInterrupted(ThreadHandle* thread, bool reset);
+
+        static void yeild();
+        static bool sleep(long long mills, int nanos);
+
+        static long long getThreadId(ThreadHandle* thread);
+
+        static int getThreadPriority(ThreadHandle* thread);
+
+        static void setThreadPriority(ThreadHandle* thread, int priority);
+
+        static const char* getThreadName(ThreadHandle* thread);
+
+        static void setThreadName(ThreadHandle* thread, const char* name);
+
+        static Thread::State getThreadState(ThreadHandle* thread);
+
+        static bool isThreadAlive(ThreadHandle* thread);
+
+        static void destroyThread(ThreadHandle* thread);
+
+        static ThreadHandle* createThreadWrapper(decaf::lang::Thread* parent, const char* name);
+
+        static Thread* getCurrentThread();
+        static ThreadHandle* getCurrentThreadHandle();
+
+        // Remove the given thread from scheduling
+        static void park(Thread* thread);
+
+        // Remove the given thread from scheduling for the given time
+        static bool park(Thread* thread, long long mills, int nanos);
+
+        // Makes the given thread available for scheduling once again.
+        static void unpark(Thread* thread);
+
+    private:
+
+        static ThreadHandle* attachToCurrentThread();
+
+        static void monitorEnterUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread);
+        static bool monitorTryEnterUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread);
+        static void monitorExitUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread);
+
+    };
+
+}}}}
+
+#endif /* _DECAF_INTERNAL_UTIL_CONCURRENT_THREADING_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.h
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message