activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1372538 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConnection.cpp kernels/ActiveMQSessionKernel.cpp
Date Mon, 13 Aug 2012 18:30:03 GMT
Author: tabish
Date: Mon Aug 13 18:30:02 2012
New Revision: 1372538

URL: http://svn.apache.org/viewvc?rev=1372538&view=rev
Log:
Switch to LinkedList and read / write lock for better stability.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1372538&r1=1372537&r2=1372538&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Mon Aug 13 18:30:02 2012
@@ -163,7 +163,8 @@ namespace core{
         DispatcherMap dispatchers;
         ProducerMap activeProducers;
 
-        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQSessionKernel>
> activeSessions;
+        decaf::util::concurrent::locks::ReentrantReadWriteLock sessionsLock;
+        decaf::util::LinkedList< Pointer<ActiveMQSessionKernel> > activeSessions;
         decaf::util::LinkedList<transport::TransportListener*> transportListeners;
 
         TempDestinationMap activeTempDestinations;
@@ -205,6 +206,7 @@ namespace core{
                              firstFailureError(),
                              dispatchers(),
                              activeProducers(),
+                             sessionsLock(),
                              activeSessions(),
                              transportListeners(),
                              activeTempDestinations() {
@@ -430,8 +432,13 @@ Pointer<SessionId> ActiveMQConnection::g
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::addSession(Pointer<ActiveMQSessionKernel> session) {
     try {
-        synchronized(&this->config->activeSessions) {
+        this->config->sessionsLock.writeLock().lock();
+        try {
             this->config->activeSessions.add(session);
+            this->config->sessionsLock.writeLock().unlock();
+        } catch (Exception& ex) {
+            this->config->sessionsLock.writeLock().unlock();
+            throw;
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -440,8 +447,13 @@ void ActiveMQConnection::addSession(Poin
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::removeSession(Pointer<ActiveMQSessionKernel> session) {
     try {
-        synchronized(&this->config->activeSessions) {
+        this->config->sessionsLock.writeLock().lock();
+        try {
             this->config->activeSessions.remove(session);
+            this->config->sessionsLock.writeLock().unlock();
+        } catch (Exception& ex) {
+            this->config->sessionsLock.writeLock().unlock();
+            throw;
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -629,17 +641,24 @@ void ActiveMQConnection::cleanup() {
 
     try {
 
-        // Get the complete list of active sessions.
-        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter(
this->config->activeSessions.iterator() );
+        this->config->sessionsLock.readLock().lock();
+        try {
+            // Get the complete list of active sessions.
+            std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > >
iter( this->config->activeSessions.iterator() );
 
-        // Dispose of all the Session resources we know are still open.
-        while (iter->hasNext()) {
-            Pointer<ActiveMQSessionKernel> session = iter->next();
-            try{
-                session->dispose();
-            } catch( cms::CMSException& ex ){
-                /* Absorb */
+            // Dispose of all the Session resources we know are still open.
+            while (iter->hasNext()) {
+                Pointer<ActiveMQSessionKernel> session = iter->next();
+                try{
+                    session->dispose();
+                } catch( cms::CMSException& ex ){
+                    /* Absorb */
+                }
             }
+            this->config->sessionsLock.readLock().unlock();
+        } catch (Exception& ex) {
+            this->config->sessionsLock.readLock().unlock();
+            throw;
         }
 
         if (this->config->isConnectionInfoSentToBroker) {
@@ -668,19 +687,24 @@ void ActiveMQConnection::start() {
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        // This starts or restarts the delivery of all incoming messages
-        // messages delivered while this connection is stopped are dropped
-        // and not acknowledged.
-        if (this->started.compareAndSet(false, true)) {
-
-            synchronized(&this->config->activeSessions) {
+        try {
+            // This starts or restarts the delivery of all incoming messages
+            // messages delivered while this connection is stopped are dropped
+            // and not acknowledged.
+            if (this->started.compareAndSet(false, true)) {
+                this->config->sessionsLock.readLock().lock();
 
                 // Start all the sessions.
                 std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > >
iter(this->config->activeSessions.iterator());
                 while (iter->hasNext()) {
                     iter->next()->start();
                 }
+
+                this->config->sessionsLock.readLock().unlock();
             }
+        } catch (Exception& ex) {
+            this->config->sessionsLock.readLock().unlock();
+            throw;
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -693,16 +717,21 @@ void ActiveMQConnection::stop() {
 
         checkClosedOrFailed();
 
-        // Once current deliveries are done this stops the delivery of any
-        // new messages.
-        if (this->started.compareAndSet(true, false)) {
-            synchronized(&this->config->activeSessions) {
+        try {
+            // Once current deliveries are done this stops the delivery of any
+            // new messages.
+            if (this->started.compareAndSet(true, false)) {
+                this->config->sessionsLock.readLock().lock();
                 std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > >
iter(this->config->activeSessions.iterator());
 
                 while (iter->hasNext()) {
                     iter->next()->stop();
                 }
+                this->config->sessionsLock.readLock().unlock();
             }
+        } catch (Exception& ex) {
+            this->config->sessionsLock.readLock().unlock();
+            throw;
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -942,16 +971,23 @@ void ActiveMQConnection::onConsumerContr
 
     Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>();
 
-    // Get the complete list of active sessions.
-    std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator()
);
+    this->config->sessionsLock.readLock().lock();
+    try {
+        // Get the complete list of active sessions.
+        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter(
this->config->activeSessions.iterator() );
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQSessionKernel> session = iter->next();
-        if (consumerControl->isClose()) {
-            session->close(consumerControl->getConsumerId());
-        } else {
-            session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
+        while (iter->hasNext()) {
+            Pointer<ActiveMQSessionKernel> session = iter->next();
+            if (consumerControl->isClose()) {
+                session->close(consumerControl->getConsumerId());
+            } else {
+                session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch());
+            }
         }
+        this->config->sessionsLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->sessionsLock.readLock().unlock();
+        throw;
     }
 }
 
@@ -1001,11 +1037,16 @@ void ActiveMQConnection::transportInterr
     this->config->transportInterruptionProcessingComplete.reset(
         new CountDownLatch( (int)this->config->dispatchers.size() ) );
 
-    synchronized(&this->config->activeSessions) {
+    this->config->sessionsLock.readLock().lock();
+    try {
         std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
         while (sessions->hasNext()) {
             sessions->next()->clearMessagesInProgress();
         }
+        this->config->sessionsLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->sessionsLock.readLock().unlock();
+        throw;
     }
 
     synchronized(&this->config->transportListeners) {
@@ -1490,12 +1531,20 @@ void ActiveMQConnection::deleteTempDesti
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
-        while (iterator->hasNext()) {
-            Pointer<ActiveMQSessionKernel> session = iterator->next();
-            if (session->isInUse(destination)) {
-                throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from
the temporary destination");
+        this->config->sessionsLock.readLock().lock();
+        try {
+            Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
+            while (iterator->hasNext()) {
+                Pointer<ActiveMQSessionKernel> session = iterator->next();
+                if (session->isInUse(destination)) {
+                    this->config->sessionsLock.readLock().unlock();
+                    throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming
from the temporary destination");
+                }
             }
+            this->config->sessionsLock.readLock().unlock();
+        } catch (Exception& ex) {
+            this->config->sessionsLock.readLock().unlock();
+            throw;
         }
 
         this->config->activeTempDestinations.remove(destination);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1372538&r1=1372537&r2=1372538&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
Mon Aug 13 18:30:02 2012
@@ -56,9 +56,10 @@
 #include <decaf/lang/Long.h>
 #include <decaf/lang/Math.h>
 #include <decaf/util/Queue.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/util/concurrent/locks/ReentrantReadWriteLock.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 
@@ -92,8 +93,10 @@ namespace kernels{
     public:
 
         AtomicBoolean synchronizationRegistered;
-        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel>
> producers;
-        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQConsumerKernel>
> consumers;
+        decaf::util::concurrent::locks::ReentrantReadWriteLock producerLock;
+        decaf::util::LinkedList< Pointer<ActiveMQProducerKernel> > producers;
+        decaf::util::concurrent::locks::ReentrantReadWriteLock consumerLock;
+        decaf::util::LinkedList< Pointer<ActiveMQConsumerKernel> > consumers;
         Pointer<Scheduler> scheduler;
         Pointer<CloseSynhcronization> closeSync;
         Mutex sendMutex;
@@ -102,8 +105,8 @@ namespace kernels{
     public:
 
         SessionConfig() : synchronizationRegistered(false),
-                          producers(), consumers(), scheduler(), closeSync(),
-                          sendMutex(), transformer(NULL) {}
+                          producerLock(), producers(), consumerLock(), consumers(),
+                          scheduler(), closeSync(), sendMutex(), transformer(NULL) {}
         ~SessionConfig() {}
     };
 
@@ -339,31 +342,45 @@ void ActiveMQSessionKernel::dispose() {
         }
 
         // Dispose of all Consumers, the dispose method skips the RemoveInfo command.
-        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > consumerIter(this->config->consumers.iterator());
-        while (consumerIter->hasNext()) {
-            try{
-                Pointer<ActiveMQConsumerKernel> consumer = consumerIter->next();
-                consumer->setFailureError(this->connection->getFirstFailureError());
-                consumer->dispose();
-                this->lastDeliveredSequenceId =
-                    Math::max(this->lastDeliveredSequenceId, consumer->getLastDeliveredSequenceId());
-            } catch (cms::CMSException& ex) {
-                /* Absorb */
+        this->config->consumerLock.writeLock().lock();
+        try {
+            Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > consumerIter(this->config->consumers.iterator());
+            while (consumerIter->hasNext()) {
+                try{
+                    Pointer<ActiveMQConsumerKernel> consumer = consumerIter->next();
+                    consumer->setFailureError(this->connection->getFirstFailureError());
+                    consumer->dispose();
+                    this->lastDeliveredSequenceId =
+                        Math::max(this->lastDeliveredSequenceId, consumer->getLastDeliveredSequenceId());
+                } catch (cms::CMSException& ex) {
+                    /* Absorb */
+                }
             }
+            this->config->consumers.clear();
+            this->config->consumerLock.writeLock().unlock();
+        } catch (Exception& ex) {
+            this->config->consumerLock.writeLock().unlock();
+            throw;
         }
-        this->config->consumers.clear();
 
-        // Dispose of all Producers, the dispose method skips the RemoveInfo command.
-        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+        this->config->producerLock.writeLock().lock();
+        try {
+            // Dispose of all Producers, the dispose method skips the RemoveInfo command.
+            std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > >
producerIter(this->config->producers.iterator());
 
-        while (producerIter->hasNext()) {
-            try{
-                producerIter->next()->dispose();
-            } catch (cms::CMSException& ex) {
-                /* Absorb */
+            while (producerIter->hasNext()) {
+                try{
+                    producerIter->next()->dispose();
+                } catch (cms::CMSException& ex) {
+                    /* Absorb */
+                }
             }
+            this->config->producers.clear();
+            this->config->producerLock.writeLock().unlock();
+        } catch (Exception& ex) {
+            this->config->producerLock.writeLock().unlock();
+            throw;
         }
-        this->config->producers.clear();
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -417,10 +434,17 @@ void ActiveMQSessionKernel::recover() {
             throw cms::IllegalStateException("This session is transacted");
         }
 
-        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
-        while (iter->hasNext()) {
-            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-            consumer->rollback();
+        this->config->consumerLock.readLock().lock();
+        try {
+            Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+            while (iter->hasNext()) {
+                Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+                consumer->rollback();
+            }
+            this->config->consumerLock.readLock().unlock();
+        } catch (Exception& ex) {
+            this->config->consumerLock.readLock().unlock();
+            throw;
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -433,32 +457,53 @@ void ActiveMQSessionKernel::clearMessage
         this->executor->clearMessagesInProgress();
     }
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        consumer->inProgressClearRequired();
-        this->connection->getScheduler()->executeAfterDelay(
-            new ClearConsumerTask(consumer), 0LL);
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            consumer->inProgressClearRequired();
+            this->connection->getScheduler()->executeAfterDelay(
+                new ClearConsumerTask(consumer), 0LL);
+        }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::acknowledge() {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        consumer->acknowledge();
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            consumer->acknowledge();
+        }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::deliverAcks() {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        consumer->deliverAcks();
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            consumer->deliverAcks();
+        }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 }
 
@@ -988,11 +1033,18 @@ void ActiveMQSessionKernel::redispatch(M
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::start() {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        consumer->start();
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            consumer->start();
+        }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 
     if (this->executor.get() != NULL) {
@@ -1043,13 +1095,21 @@ void ActiveMQSessionKernel::createTempor
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSessionKernel::isInUse(Pointer<ActiveMQDestination> destination) {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        if (consumer->isInUse(destination)) {
-            return true;
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            if (consumer->isInUse(destination)) {
+                this->config->consumerLock.readLock().unlock();
+                return true;
+            }
         }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 
     return false;
@@ -1123,7 +1183,15 @@ void ActiveMQSessionKernel::addConsumer(
     try {
 
         this->checkClosed();
-        this->config->consumers.add(consumer);
+
+        this->config->consumerLock.writeLock().lock();
+        try {
+            this->config->consumers.add(consumer);
+            this->config->consumerLock.writeLock().unlock();
+        } catch (Exception& ex) {
+            this->config->consumerLock.writeLock().unlock();
+            throw;
+        }
 
         // Register this as a message dispatcher for the consumer.
         this->connection->addDispatcher(consumer->getConsumerInfo()->getConsumerId(),
this);
@@ -1138,7 +1206,14 @@ void ActiveMQSessionKernel::removeConsum
 
     try {
         this->connection->removeDispatcher(consumer->getConsumerId());
-        this->config->consumers.remove(consumer);
+        this->config->consumerLock.writeLock().lock();
+        try {
+            this->config->consumers.remove(consumer);
+            this->config->consumerLock.writeLock().unlock();
+        } catch (Exception& ex) {
+            this->config->consumerLock.writeLock().unlock();
+            throw;
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -1150,7 +1225,16 @@ void ActiveMQSessionKernel::addProducer(
 
     try {
         this->checkClosed();
-        this->config->producers.add(producer);
+
+        this->config->producerLock.writeLock().lock();
+        try {
+            this->config->producers.add(producer);
+            this->config->producerLock.writeLock().unlock();
+        } catch(Exception& ex) {
+            this->config->producerLock.writeLock().unlock();
+            throw;
+        }
+
         this->connection->addProducer(producer);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
@@ -1163,7 +1247,14 @@ void ActiveMQSessionKernel::removeProduc
 
     try {
         this->connection->removeProducer(producer->getProducerId());
-        this->config->producers.remove(producer);
+        this->config->producerLock.writeLock().lock();
+        try {
+            this->config->producers.remove(producer);
+            this->config->producerLock.writeLock().unlock();
+        } catch(Exception& ex) {
+            this->config->producerLock.writeLock().unlock();
+            throw;
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -1173,13 +1264,23 @@ void ActiveMQSessionKernel::removeProduc
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<ActiveMQProducerKernel> ActiveMQSessionKernel::lookupProducerKernel(Pointer<ProducerId>
id) {
 
-    std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+    this->config->producerLock.readLock().lock();
+    try {
+
+        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
 
-    while (producerIter->hasNext()) {
-        Pointer<ActiveMQProducerKernel> producer = producerIter->next();
-        if (producer->getProducerId()->equals(*id)) {
-            return producer;
+        while (producerIter->hasNext()) {
+            Pointer<ActiveMQProducerKernel> producer = producerIter->next();
+            if (producer->getProducerId()->equals(*id)) {
+                this->config->producerLock.readLock().unlock();
+                return producer;
+            }
         }
+
+        this->config->producerLock.readLock().unlock();
+    } catch(Exception& ex) {
+        this->config->producerLock.readLock().unlock();
+        throw;
     }
 
     return Pointer<ActiveMQProducerKernel>();
@@ -1188,13 +1289,21 @@ Pointer<ActiveMQProducerKernel> ActiveMQ
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<ActiveMQConsumerKernel> ActiveMQSessionKernel::lookupConsumerKernel(Pointer<ConsumerId>
id) {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        if (consumer->getConsumerId()->equals(*id)) {
-            return consumer;
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            if (consumer->getConsumerId()->equals(*id)) {
+                this->config->consumerLock.readLock().unlock();
+                return consumer;
+            }
         }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 
     return Pointer<ActiveMQConsumerKernel>();
@@ -1203,13 +1312,21 @@ Pointer<ActiveMQConsumerKernel> ActiveMQ
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSessionKernel::iterateConsumers() {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        if (consumer->iterate()) {
-            return true;
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            if (consumer->iterate()) {
+                this->config->consumerLock.readLock().unlock();
+                return true;
+            }
         }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 
     return false;
@@ -1218,29 +1335,43 @@ bool ActiveMQSessionKernel::iterateConsu
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::setPrefetchSize(Pointer<ConsumerId> id, int prefetch) {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        if (consumer->getConsumerId()->equals(*id)) {
-            consumer->setPrefetchSize(prefetch);
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            if (consumer->getConsumerId()->equals(*id)) {
+                consumer->setPrefetchSize(prefetch);
+            }
         }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::close(Pointer<ConsumerId> id) {
 
-    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-    while (iter->hasNext()) {
-        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
-        if (consumer->getConsumerId()->equals(*id)) {
-            try {
-                consumer->close();
-            } catch (cms::CMSException& e) {
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            if (consumer->getConsumerId()->equals(*id)) {
+                try {
+                    consumer->close();
+                } catch (cms::CMSException& e) {
+                }
             }
         }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
     }
 }
 



Mime
View raw message