activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r657834 - in /activemq/activemq-cpp/trunk/src/main/activemq/core: ActiveMQSessionExecutor.cpp ActiveMQSessionExecutor.h
Date Mon, 19 May 2008 14:16:13 GMT
Author: tabish
Date: Mon May 19 07:16:13 2008
New Revision: 657834

URL: http://svn.apache.org/viewvc?rev=657834&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-174

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=657834&r1=657833&r2=657834&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp Mon May
19 07:16:13 2008
@@ -138,21 +138,18 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::stop() {
 
-    synchronized( &mutex ) {
+    // We lock here to make sure that we wait until the thread
+    // is done with an internal dispatch operation, otherwise 
+    // we might return before that and cause the caller to be 
+    // in an inconsistant state.
+    synchronized( &dispatchMutex ) {
 
         if( closed || !started ) {
             return;
         }
 
-        // Set the state to stopped.
-        started = false;
-
-        // Wakeup the thread so that it can acknowledge the stop request.
-        mutex.notifyAll();
 
-        // Wait for the thread to notify us that it has acknowledged
-        // the stop request.
-        mutex.wait();
+        synchronized( &mutex ) { started = false; }
     }
 }
 
@@ -218,13 +215,8 @@
                     return;
                 }
 
-                // When told to stop, the calling thread will wait for a
-                // responding notification, indicating that we have acknowledged
-                // the stop command.
-                if( !started ) {
-                    mutex.notifyAll();
-                }
-
+                // When stopped we hit this case and wait otherwise
+                // if there are messages we 
                 if( messageQueue.empty() || !started ) {
 
                     // Wait for more data or to be woken up.
@@ -248,29 +240,31 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::dispatchAll() {
 
-    // Take out all of the dispatch data currently in the array.
-    list<DispatchData> dataList;
-    synchronized( &mutex ) {
+    // Dispatch all currently available messages.  This lock allows the
+    // main thread to wait while we finish with a dispatch cycle, the 
+    // stop method for instance should try and lock this mutex so that
+    // it knows that we've had a chance to read the started flag and 
+    // detect that we are stopped, otherwise stop might return while
+    // we are still dispatching messages.
+    synchronized( &dispatchMutex ) {
+
+        // Take out all of the dispatch data currently in the array.
+        list<DispatchData> dataList;
+        synchronized( &mutex ) {
+
+            // If stopped or closed we don't want to start dispatching.
+            if( !started || closed ) {
+                return;
+            }
 
-        // When told to stop, the calling thread will wait for a
-        // responding notification, indicating that we have acknowledged
-        // the stop command.
-        if( !started ) {
-            mutex.notifyAll();
+            dataList = messageQueue;
+            messageQueue.clear();
         }
 
-        if( !started || closed ) {
-            return;
+        list<DispatchData>::iterator iter = dataList.begin();
+        while( iter != dataList.end() ) {
+            DispatchData& data = *iter++;
+            dispatch( data );
         }
-
-        dataList = messageQueue;
-        messageQueue.clear();
-    }
-
-    // Dispatch all currently available messages.
-    list<DispatchData>::iterator iter = dataList.begin();
-    while( iter != dataList.end() ) {
-        DispatchData& data = *iter++;
-        dispatch( data );
     }
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h?rev=657834&r1=657833&r2=657834&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h Mon May 19
07:16:13 2008
@@ -45,6 +45,7 @@
         std::list<DispatchData> messageQueue;
         decaf::lang::Thread* thread;
         decaf::util::concurrent::Mutex mutex;
+        decaf::util::concurrent::Mutex dispatchMutex;
         bool started;
         bool closed;
 



Mime
View raw message