activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1338453 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: core/ActiveMQConnection.cpp core/ActiveMQConnection.h core/kernels/ActiveMQSessionKernel.cpp util/ServiceSupport.cpp util/ServiceSupport.h
Date Mon, 14 May 2012 22:06:46 GMT
Author: tabish
Date: Mon May 14 22:06:45 2012
New Revision: 1338453

URL: http://svn.apache.org/viewvc?rev=1338453&view=rev
Log:
Remove use of the CopyOnWriteArrayList as it currently has some bugs that can lead to segfaults

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1338453&r1=1338452&r2=1338453&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Mon May 14 22:06:45 2012
@@ -37,6 +37,7 @@
 #include <decaf/lang/Boolean.h>
 #include <decaf/lang/Integer.h>
 #include <decaf/util/Iterator.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/util/UUID.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/TimeUnit.h>
@@ -159,8 +160,8 @@ namespace core{
         DispatcherMap dispatchers;
         ProducerMap activeProducers;
 
-        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQSessionKernel>
> activeSessions;
-        decaf::util::concurrent::CopyOnWriteArrayList<transport::TransportListener*>
transportListeners;
+        decaf::util::LinkedList< Pointer<ActiveMQSessionKernel> > activeSessions;
+        decaf::util::LinkedList<transport::TransportListener*> transportListeners;
 
         TempDestinationMap activeTempDestinations;
 
@@ -303,12 +304,14 @@ namespace core{
                 // Clean up the Connection resources.
                 this->connection->cleanup();
 
-                Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator()
);
+                synchronized(&this->config->transportListeners) {
+                    Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator()
);
 
-                while( iter->hasNext() ) {
-                    try{
-                        iter->next()->onException(ex);
-                    } catch(...) {}
+                    while( iter->hasNext() ) {
+                        try{
+                            iter->next()->onException(ex);
+                        } catch(...) {}
+                    }
                 }
             } catch(Exception& ex) {}
         }
@@ -528,20 +531,22 @@ void ActiveMQConnection::close() {
             AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
         }
 
-        // Get the complete list of active sessions.
-        std::auto_ptr< Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
-
+        // Get the complete list of active sessions and call dispose() which should not trigger
+        // any messages back to the broker.
         long long lastDeliveredSequenceId = 0;
+        synchronized(&this->config->activeSessions) {
+            std::auto_ptr< Iterator<Pointer<ActiveMQSessionKernel> > >
iter(this->config->activeSessions.iterator());
 
-        // Dispose of all the Session resources we know are still open.
-        while (iter->hasNext()) {
-            Pointer<ActiveMQSessionKernel> session = iter->next();
-            try{
-                session->dispose();
-                lastDeliveredSequenceId =
-                    Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
-            } catch( cms::CMSException& ex ){
-                /* Absorb */
+            // Dispose of all the Session resources we know are still open.
+            while (iter->hasNext()) {
+                Pointer<ActiveMQSessionKernel> session = iter->next();
+                try{
+                    session->dispose();
+                    lastDeliveredSequenceId =
+                        Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
+                } catch( cms::CMSException& ex ){
+                    /* Absorb */
+                }
             }
         }
 
@@ -578,16 +583,18 @@ void ActiveMQConnection::cleanup() {
 
     try {
 
-        // Get the complete list of active sessions.
-        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter(
this->config->activeSessions.iterator() );
+        synchronized(&this->config->activeSessions) {
+            // Get the complete list of active sessions.
+            std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > >
iter( this->config->activeSessions.iterator() );
 
-        // Dispose of all the Session resources we know are still open.
-        while (iter->hasNext()) {
-            Pointer<ActiveMQSessionKernel> session = iter->next();
-            try{
-                session->dispose();
-            } catch( cms::CMSException& ex ){
-                /* Absorb */
+            // Dispose of all the Session resources we know are still open.
+            while (iter->hasNext()) {
+                Pointer<ActiveMQSessionKernel> session = iter->next();
+                try{
+                    session->dispose();
+                } catch( cms::CMSException& ex ){
+                    /* Absorb */
+                }
             }
         }
 
@@ -870,12 +877,13 @@ void ActiveMQConnection::onCommand(const
             this->onConsumerControl(command);
         }
 
-        Pointer< Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
-
-        while (iter->hasNext()) {
-            try{
-                iter->next()->onCommand(command);
-            } catch(...) {}
+        synchronized(&this->config->transportListeners) {
+            Pointer< Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
+            while (iter->hasNext()) {
+                try{
+                    iter->next()->onCommand(command);
+                } catch(...) {}
+            }
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -898,15 +906,16 @@ void ActiveMQConnection::onConsumerContr
 
     Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>();
 
-    // Get the complete list of active sessions.
-    std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator()
);
+    synchronized(&this->config->activeSessions) {
+        std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQSessionKernel> session = iter->next();
-        if (consumerControl->isClose()) {
-            session->close(consumerControl->getConsumerId());
-        } else {
-            session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
+        while (iter->hasNext()) {
+            Pointer<ActiveMQSessionKernel> session = iter->next();
+            if (consumerControl->isClose()) {
+                session->close(consumerControl->getConsumerId());
+            } else {
+                session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
+            }
         }
     }
 }
@@ -957,30 +966,33 @@ void ActiveMQConnection::transportInterr
     this->config->transportInterruptionProcessingComplete.reset(
         new CountDownLatch( (int)this->config->dispatchers.size() ) );
 
-    std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
-
-    while (sessions->hasNext()) {
-        sessions->next()->clearMessagesInProgress();
+    synchronized(&this->config->activeSessions) {
+        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
+        while (sessions->hasNext()) {
+            sessions->next()->clearMessagesInProgress();
+        }
     }
 
-    Pointer< Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
-
-    while (listeners->hasNext()) {
-        try{
-            listeners->next()->transportInterrupted();
-        } catch(...) {}
+    synchronized(&this->config->transportListeners) {
+        Pointer< Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
+        while (listeners->hasNext()) {
+            try{
+                listeners->next()->transportInterrupted();
+            } catch(...) {}
+        }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::transportResumed() {
 
-    Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator()
);
-
-    while( iter->hasNext() ) {
-        try{
-            iter->next()->transportResumed();
-        } catch(...) {}
+    synchronized(&this->config->transportListeners) {
+        Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator()
);
+        while( iter->hasNext() ) {
+            try{
+                iter->next()->transportResumed();
+            } catch(...) {}
+        }
     }
 }
 
@@ -1120,7 +1132,9 @@ void ActiveMQConnection::addTransportLis
     }
 
     // Add this listener from the set of active TransportListeners
-    this->config->transportListeners.add(transportListener);
+    synchronized(&this->config->transportListeners) {
+        this->config->transportListeners.add(transportListener);
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1131,7 +1145,9 @@ void ActiveMQConnection::removeTransport
     }
 
     // Remove this listener from the set of active TransportListeners
-    this->config->transportListeners.remove(transportListener);
+    synchronized(&this->config->transportListeners) {
+        this->config->transportListeners.remove(transportListener);
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1438,11 +1454,13 @@ void ActiveMQConnection::deleteTempDesti
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
-        while (iterator->hasNext()) {
-            Pointer<ActiveMQSessionKernel> session = iterator->next();
-            if (session->isInUse(destination)) {
-                throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from
the temporary destination");
+        synchronized(&this->config->activeSessions) {
+            Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
+            while (iterator->hasNext()) {
+                Pointer<ActiveMQSessionKernel> session = iterator->next();
+                if (session->isInUse(destination)) {
+                    throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming
from the temporary destination");
+                }
             }
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1338453&r1=1338452&r2=1338453&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon
May 14 22:06:45 2012
@@ -34,7 +34,6 @@
 #include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
-#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
 #include <decaf/util/concurrent/ExecutorService.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -47,7 +46,6 @@ namespace activemq{
 namespace core{
 
     using decaf::lang::Pointer;
-    using decaf::util::concurrent::atomic::AtomicBoolean;
 
     class ActiveMQSession;
     class ConnectionConfig;
@@ -74,24 +72,24 @@ namespace core{
         /**
          * Indicates if this Connection is started
          */
-        AtomicBoolean started;
+        decaf::util::concurrent::atomic::AtomicBoolean started;
 
         /**
          * Indicates that this connection has been closed, it is no longer
          * usable after this becomes true
          */
-        AtomicBoolean closed;
+        decaf::util::concurrent::atomic::AtomicBoolean closed;
 
         /**
          * Indicates that this connection has been closed, it is no longer
          * usable after this becomes true
          */
-        AtomicBoolean closing;
+        decaf::util::concurrent::atomic::AtomicBoolean closing;
 
         /**
          * Indicates that this connection's Transport has failed.
          */
-        AtomicBoolean transportFailed;
+        decaf::util::concurrent::atomic::AtomicBoolean transportFailed;
 
     private:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1338453&r1=1338452&r2=1338453&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
Mon May 14 22:06:45 2012
@@ -56,6 +56,7 @@
 #include <decaf/lang/Long.h>
 #include <decaf/lang/Math.h>
 #include <decaf/util/Queue.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
@@ -97,7 +98,7 @@ namespace kernels{
     public:
 
         AtomicBoolean synchronizationRegistered;
-        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel>
> producers;
+        decaf::util::LinkedList< Pointer<ActiveMQProducerKernel> > producers;
         Pointer<Scheduler> scheduler;
         Pointer<CloseSynhcronization> closeSync;
         ConsumersMap consumers;
@@ -1183,7 +1184,9 @@ void ActiveMQSessionKernel::addProducer(
 
     try {
         this->checkClosed();
-        this->config->producers.add(producer);
+        synchronized(&this->config->producers) {
+            this->config->producers.add(producer);
+        }
         this->connection->addProducer(producer);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
@@ -1196,7 +1199,9 @@ void ActiveMQSessionKernel::removeProduc
 
     try {
         this->connection->removeProducer(producer->getProducerId());
-        this->config->producers.remove(producer);
+        synchronized(&this->config->producers) {
+            this->config->producers.remove(producer);
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -1206,12 +1211,13 @@ void ActiveMQSessionKernel::removeProduc
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<ActiveMQProducerKernel> ActiveMQSessionKernel::lookupProducerKernel(Pointer<ProducerId>
id) {
 
-    std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
-
-    while (producerIter->hasNext()) {
-        Pointer<ActiveMQProducerKernel> producer = producerIter->next();
-        if (producer->getProducerId()->equals(*id)) {
-            return producer;
+    synchronized(&this->config->producers) {
+        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+        while (producerIter->hasNext()) {
+            Pointer<ActiveMQProducerKernel> producer = producerIter->next();
+            if (producer->getProducerId()->equals(*id)) {
+                return producer;
+            }
         }
     }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp?rev=1338453&r1=1338452&r2=1338453&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp Mon
May 14 22:06:45 2012
@@ -30,7 +30,7 @@ using namespace decaf::lang;
 using namespace decaf::util;
 
 ////////////////////////////////////////////////////////////////////////////////
-ServiceSupport::ServiceSupport() : Service(), started(), stopping(), stopped(true), listemers()
{
+ServiceSupport::ServiceSupport() : Service(), started(), stopping(), stopped(true), listeners()
{
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -62,9 +62,11 @@ void ServiceSupport::start() {
         this->stopping.set(false);
         this->stopped.set(!success);
 
-        std::auto_ptr< Iterator<ServiceListener*> > iter(this->listemers.iterator());
-        while(iter->hasNext()) {
-            iter->next()->started(this);
+        synchronized(&this->listeners) {
+            std::auto_ptr< Iterator<ServiceListener*> > iter(this->listeners.iterator());
+            while(iter->hasNext()) {
+                iter->next()->started(this);
+            }
         }
     }
 }
@@ -84,9 +86,11 @@ void ServiceSupport::stop() {
         this->started.set(false);
         this->stopping.set(false);
 
-        std::auto_ptr< Iterator<ServiceListener*> > iter(this->listemers.iterator());
-        while(iter->hasNext()) {
-            iter->next()->stopped(this);
+        synchronized(&this->listeners) {
+            std::auto_ptr< Iterator<ServiceListener*> > iter(this->listeners.iterator());
+            while(iter->hasNext()) {
+                iter->next()->stopped(this);
+            }
         }
 
         stopper.throwFirstException();
@@ -111,13 +115,17 @@ bool ServiceSupport::isStopped() const {
 ////////////////////////////////////////////////////////////////////////////////
 void ServiceSupport::addServiceListener(ServiceListener* listener) {
     if(listener != NULL) {
-        this->listemers.add(listener);
+        synchronized(&this->listeners) {
+            this->listeners.add(listener);
+        }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ServiceSupport::removeServiceListener(ServiceListener* listener) {
     if(listener != NULL) {
-        this->listemers.remove(listener);
+        synchronized(&this->listeners) {
+            this->listeners.remove(listener);
+        }
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h?rev=1338453&r1=1338452&r2=1338453&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h Mon May
14 22:06:45 2012
@@ -22,7 +22,7 @@
 #include <activemq/util/Service.h>
 
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
-#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
+#include <decaf/util/ArrayList.h>
 
 namespace activemq {
 namespace util {
@@ -41,7 +41,7 @@ namespace util {
         decaf::util::concurrent::atomic::AtomicBoolean started;
         decaf::util::concurrent::atomic::AtomicBoolean stopping;
         decaf::util::concurrent::atomic::AtomicBoolean stopped;
-        decaf::util::concurrent::CopyOnWriteArrayList<ServiceListener*> listemers;
+        decaf::util::ArrayList<ServiceListener*> listeners;
 
     public:
 



Mime
View raw message