activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1347798 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ./ kernels/
Date Thu, 07 Jun 2012 21:12:58 GMT
Author: tabish
Date: Thu Jun  7 21:12:57 2012
New Revision: 1347798

URL: http://svn.apache.org/viewvc?rev=1347798&view=rev
Log:
Internalize more of the Session implementation and switch to using the CopyOnWriteArrayList
for consumers to allow concurrent read / write of consumers from the list to avoid a deadlock
that can occur during a transaction rollback.  

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1347798&r1=1347797&r2=1347798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Thu Jun  7 21:12:57 2012
@@ -44,6 +44,7 @@
 #include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/concurrent/ThreadPoolExecutor.h>
 #include <decaf/util/concurrent/LinkedBlockingQueue.h>
+#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
 
 #include <activemq/commands/Command.h>
 #include <activemq/commands/ActiveMQMessage.h>

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp?rev=1347798&r1=1347797&r2=1347798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
Thu Jun  7 21:12:57 2012
@@ -265,7 +265,7 @@ Pointer<ActiveMQConsumerKernel> ActiveMQ
         this->session->addConsumer(consumer);
         this->session->syncRequest(consumer->getConsumerInfo());
     } catch (Exception& ex) {
-        this->session->removeConsumer(consumer->getConsumerId());
+        this->session->removeConsumer(consumer);
         throw ex;
     }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=1347798&r1=1347797&r2=1347798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
Thu Jun  7 21:12:57 2012
@@ -124,13 +124,8 @@ void ActiveMQSessionExecutor::dispatch(c
 
     try {
 
-        Pointer<ActiveMQConsumerKernel> consumer;
-
-        synchronized(&( this->session->consumers)) {
-            if (this->session->consumers.containsKey(dispatch->getConsumerId()))
{
-                consumer = this->session->consumers.get(dispatch->getConsumerId());
-            }
-        }
+        Pointer<ActiveMQConsumerKernel> consumer =
+            this->session->lookupConsumerKernel(dispatch->getConsumerId());
 
         // If the consumer is not available, just ignore the message.
         // Otherwise, dispatch the message to the consumer.
@@ -152,16 +147,8 @@ bool ActiveMQSessionExecutor::iterate() 
 
     try {
 
-        synchronized(&(this->session->consumers)) {
-            std::vector<Pointer<ActiveMQConsumerKernel> > consumers = this->session->consumers.values();
-            std::vector<Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-
-            // Deliver any messages queued on the consumer to their listeners.
-            for (; iter != consumers.end(); ++iter) {
-                if ((*iter)->iterate()) {
-                    return true;
-                }
-            }
+        if (this->session->iterateConsumers()) {
+            return true;
         }
 
         // No messages left queued on the listeners.. so now dispatch messages

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1347798&r1=1347797&r2=1347798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
Thu Jun  7 21:12:57 2012
@@ -505,7 +505,14 @@ void ActiveMQConsumerKernel::dispose() {
             }
 
             // Remove this Consumer from the Connections set of Dispatchers
-            this->session->removeConsumer(this->consumerInfo->getConsumerId());
+            Pointer<ActiveMQConsumerKernel> consumer(this);
+            try {
+                this->session->removeConsumer(consumer);
+            } catch(Exception& e) {
+                consumer.release();
+                throw;
+            }
+            consumer.release();
 
             // If we encountered an error, propagate it.
             if (haveException) {
@@ -1336,13 +1343,11 @@ cms::MessageTransformer* ActiveMQConsume
 
 ////////////////////////////////////////////////////////////////////////////////
 const Pointer<commands::ConsumerInfo>& ActiveMQConsumerKernel::getConsumerInfo()
const {
-    this->checkClosed();
     return this->consumerInfo;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 const Pointer<commands::ConsumerId>& ActiveMQConsumerKernel::getConsumerId() const
{
-    this->checkClosed();
     return this->consumerInfo->getConsumerId();
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1347798&r1=1347797&r2=1347798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
Thu Jun  7 21:12:57 2012
@@ -57,6 +57,7 @@
 #include <decaf/lang/Math.h>
 #include <decaf/util/Queue.h>
 #include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -83,12 +84,6 @@ namespace kernels{
     class CloseSynhcronization;
 
     class SessionConfig {
-    public:
-
-        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
-                                     Pointer<ActiveMQConsumerKernel>,
-                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
-
     private:
 
         SessionConfig(const SessionConfig&);
@@ -98,17 +93,17 @@ namespace kernels{
 
         AtomicBoolean synchronizationRegistered;
         decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel>
> producers;
+        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQConsumerKernel>
> consumers;
         Pointer<Scheduler> scheduler;
         Pointer<CloseSynhcronization> closeSync;
-        ConsumersMap consumers;
         Mutex sendMutex;
         cms::MessageTransformer* transformer;
 
     public:
 
         SessionConfig() : synchronizationRegistered(false),
-                          producers(), scheduler(), closeSync(),
-                          consumers(), sendMutex(), transformer(NULL) {}
+                          producers(), consumers(), scheduler(), closeSync(),
+                          sendMutex(), transformer(NULL) {}
         ~SessionConfig() {}
     };
 
@@ -198,7 +193,6 @@ ActiveMQSessionKernel::ActiveMQSessionKe
                                                                              sessionInfo(),
                                                                              transaction(),
                                                                              connection(connection),
-                                                                             consumers(),
                                                                              closed(false),
                                                                              executor(),
                                                                              ackMode(ackMode),
@@ -345,21 +339,19 @@ void ActiveMQSessionKernel::dispose() {
         }
 
         // Dispose of all Consumers, the dispose method skips the RemoveInfo command.
-        synchronized(&this->consumers) {
-
-            std::vector< Pointer<ActiveMQConsumerKernel> > closables = this->consumers.values();
-
-            for (std::size_t i = 0; i < closables.size(); ++i) {
-                try{
-                    closables[i]->setFailureError(this->connection->getFirstFailureError());
-                    closables[i]->dispose();
-                    this->lastDeliveredSequenceId =
-                        Math::max(this->lastDeliveredSequenceId, closables[i]->getLastDeliveredSequenceId());
-                } catch( cms::CMSException& ex ){
-                    /* Absorb */
-                }
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > consumerIter(this->config->consumers.iterator());
+        while (consumerIter->hasNext()) {
+            try{
+                Pointer<ActiveMQConsumerKernel> consumer = consumerIter->next();
+                consumer->setFailureError(this->connection->getFirstFailureError());
+                consumer->dispose();
+                this->lastDeliveredSequenceId =
+                    Math::max(this->lastDeliveredSequenceId, consumer->getLastDeliveredSequenceId());
+            } catch (cms::CMSException& ex) {
+                /* Absorb */
             }
         }
+        this->config->consumers.clear();
 
         // Dispose of all Producers, the dispose method skips the RemoveInfo command.
         std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
@@ -371,6 +363,7 @@ void ActiveMQSessionKernel::dispose() {
                 /* Absorb */
             }
         }
+        this->config->producers.clear();
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -424,13 +417,10 @@ void ActiveMQSessionKernel::recover() {
             throw cms::IllegalStateException("This session is transacted");
         }
 
-        synchronized( &this->consumers ) {
-            std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-            std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-            for( ; iter != consumers.end(); ++iter ) {
-                (*iter)->rollback();
-            }
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            consumer->rollback();
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -443,42 +433,32 @@ void ActiveMQSessionKernel::clearMessage
         this->executor->clearMessagesInProgress();
     }
 
-    synchronized(&this->consumers) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for (; iter != consumers.end(); ++iter) {
-            (*iter)->inProgressClearRequired();
-
-            this->connection->getScheduler()->executeAfterDelay(
-                new ClearConsumerTask(*iter), 0LL);
-        }
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        consumer->inProgressClearRequired();
+        this->connection->getScheduler()->executeAfterDelay(
+            new ClearConsumerTask(consumer), 0LL);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::acknowledge() {
 
-    synchronized(&this->consumers) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for (; iter != consumers.end(); ++iter) {
-            (*iter)->acknowledge();
-        }
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        consumer->acknowledge();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::deliverAcks() {
 
-    synchronized(&this->consumers) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for (; iter != consumers.end(); ++iter) {
-            (*iter)->deliverAcks();
-        }
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        consumer->deliverAcks();
     }
 }
 
@@ -538,7 +518,7 @@ cms::MessageConsumer* ActiveMQSessionKer
             this->addConsumer(consumer);
             this->connection->syncRequest(consumer->getConsumerInfo());
         } catch (Exception& ex) {
-            this->removeConsumer(consumer->getConsumerId());
+            this->removeConsumer(consumer);
             throw ex;
         }
 
@@ -582,7 +562,7 @@ cms::MessageConsumer* ActiveMQSessionKer
             this->addConsumer(consumer);
             this->connection->syncRequest(consumer->getConsumerInfo());
         } catch (Exception& ex) {
-            this->removeConsumer(consumer->getConsumerId());
+            this->removeConsumer(consumer);
             throw ex;
         }
 
@@ -1008,13 +988,11 @@ void ActiveMQSessionKernel::redispatch(M
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::start() {
 
-    synchronized(&this->consumers) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for (; iter != consumers.end(); ++iter) {
-            (*iter)->start();
-        }
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        consumer->start();
     }
 
     if (this->executor.get() != NULL) {
@@ -1065,14 +1043,12 @@ void ActiveMQSessionKernel::createTempor
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSessionKernel::isInUse(Pointer<ActiveMQDestination> destination) {
 
-    synchronized(&this->consumers) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for (; iter != consumers.end(); ++iter) {
-            if ((*iter)->isInUse(destination)) {
-                return true;
-            }
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        if (consumer->isInUse(destination)) {
+            return true;
         }
     }
 
@@ -1147,11 +1123,7 @@ void ActiveMQSessionKernel::addConsumer(
     try {
 
         this->checkClosed();
-
-        // Add the consumer to the map.
-        synchronized(&this->consumers) {
-            this->consumers.put(consumer->getConsumerInfo()->getConsumerId(), consumer);
-        }
+        this->config->consumers.add(consumer);
 
         // Register this as a message dispatcher for the consumer.
         this->connection->addDispatcher(consumer->getConsumerInfo()->getConsumerId(),
this);
@@ -1162,18 +1134,11 @@ void ActiveMQSessionKernel::addConsumer(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionKernel::removeConsumer(Pointer<ConsumerId> consumerId) {
+void ActiveMQSessionKernel::removeConsumer(Pointer<ActiveMQConsumerKernel> consumer)
{
 
     try {
-
-        synchronized(&this->consumers) {
-            if (this->consumers.containsKey(consumerId)) {
-                // Remove this Id both from the Sessions Map of Consumers and from the Connection.
-                // If the kernels parent is destroyed then it will get cleaned up now.
-                this->connection->removeDispatcher(consumerId);
-                this->consumers.remove(consumerId);
-            }
-        }
+        this->connection->removeDispatcher(consumer->getConsumerId());
+        this->config->consumers.remove(consumer);
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -1223,9 +1188,12 @@ Pointer<ActiveMQProducerKernel> ActiveMQ
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<ActiveMQConsumerKernel> ActiveMQSessionKernel::lookupConsumerKernel(Pointer<ConsumerId>
id) {
 
-    synchronized(&this->consumers) {
-        if (this->consumers.containsKey(id)) {
-            return this->consumers.get(id);
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        if (consumer->getConsumerId()->equals(*id)) {
+            return consumer;
         }
     }
 
@@ -1233,11 +1201,28 @@ Pointer<ActiveMQConsumerKernel> ActiveMQ
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSessionKernel::iterateConsumers() {
+
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        if (consumer->iterate()) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::setPrefetchSize(Pointer<ConsumerId> id, int prefetch) {
 
-    synchronized(&this->consumers) {
-        if (this->consumers.containsKey(id)) {
-            Pointer<ActiveMQConsumerKernel> consumer = this->consumers.get(id);
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        if (consumer->getConsumerId()->equals(*id)) {
             consumer->setPrefetchSize(prefetch);
         }
     }
@@ -1246,10 +1231,11 @@ void ActiveMQSessionKernel::setPrefetchS
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::close(Pointer<ConsumerId> id) {
 
-    synchronized(&this->consumers) {
-        if (this->consumers.containsKey(id)) {
-            Pointer<ActiveMQConsumerKernel> consumer = this->consumers.get(id);
+    Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
 
+    while (iter->hasNext()) {
+        Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+        if (consumer->getConsumerId()->equals(*id)) {
             try {
                 consumer->close();
             } catch (cms::CMSException& e) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1347798&r1=1347797&r2=1347798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
Thu Jun  7 21:12:57 2012
@@ -40,10 +40,8 @@
 #include <activemq/threads/Scheduler.h>
 
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/StlMap.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
-#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
 
 #include <string>
 #include <memory>
@@ -66,10 +64,6 @@ namespace kernels {
     class AMQCPP_API ActiveMQSessionKernel : public virtual cms::Session, public Dispatcher
{
     private:
 
-        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
-                                     Pointer<activemq::core::kernels::ActiveMQConsumerKernel>,
-                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
-
         friend class activemq::core::ActiveMQSessionExecutor;
 
     protected:
@@ -92,11 +86,6 @@ namespace kernels {
         ActiveMQConnection* connection;
 
         /**
-         * Map of consumers.
-         */
-        ConsumersMap consumers;
-
-        /**
          * Indicates that this connection has been closed, it is no longer
          * usable after this becomes true
          */
@@ -398,7 +387,7 @@ namespace kernels {
          * the session is closed.
          *
          * @param consumer
-         *      The ActiveMQConsumer instance to add to this session.
+         *      The ActiveMQConsumerKernel instance to add to this session.
          *
          * @throw ActiveMQException if an internal error occurs.
          */
@@ -408,12 +397,12 @@ namespace kernels {
          * Dispose of a MessageConsumer from this session.  Removes it from the Connection
          * and clean up any resources associated with it.
          *
-         * @param consumerId
-         *      The ConsumerId of the MessageConsumer to remove from this Session.
+         * @param consumer
+         *      The ActiveMQConsumerKernel instance to remove from this session.
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void removeConsumer(Pointer<commands::ConsumerId> consumerId);
+        void removeConsumer(Pointer<ActiveMQConsumerKernel> consumer);
 
         /**
          * Adds a MessageProducer to this session registering it with the Connection and
store
@@ -543,6 +532,16 @@ namespace kernels {
          */
         Pointer<ActiveMQConsumerKernel> lookupConsumerKernel(Pointer<commands::ConsumerId>
id);
 
+        /**
+         * Gives each consumer a chance to dispatch messages that have been enqueued by calling
+         * each consumers iterate method.  Returns true if this method needs to be called
again
+         * because a consumer requires further processing time to complete its dispatching.
 Once
+         * all consumers are done this method returns false.
+         *
+         * @returns true if more iterations are needed false otherwise.
+         */
+        bool iterateConsumers();
+
    private:
 
        /**



Mime
View raw message