activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1305601 [3/5] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/cmsutil/ main/activemq/core/ main/activemq/core/kernels/ main/cms/ test/
Date Mon, 26 Mar 2012 21:11:14 GMT
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp Mon Mar 26 21:11:12 2012
@@ -21,7 +21,7 @@
 #include <activemq/commands/ConsumerId.h>
 #include <activemq/commands/ActiveMQDestination.h>
 #include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/PrefetchPolicy.h>
 #include <activemq/exceptions/ActiveMQException.h>
@@ -32,6 +32,7 @@
 using namespace std;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::kernels;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace decaf;
@@ -44,39 +45,39 @@ using namespace decaf::util::concurrent:
 namespace activemq{
 namespace core{
 
-    class Browser : public ActiveMQConsumer {
+    class Browser : public ActiveMQConsumerKernel {
     public:
 
         ActiveMQQueueBrowser* parent;
 
     private:
 
-        Browser( const Browser& );
-        Browser& operator= ( const Browser& );
+        Browser(const Browser&);
+        Browser& operator= (const Browser&);
 
     public:
 
-        Browser( ActiveMQQueueBrowser* parent, ActiveMQSession* session,
-                 const Pointer<commands::ConsumerId>& id,
-                 const Pointer<commands::ActiveMQDestination>& destination,
-                 const std::string& name, const std::string& selector,
-                 int prefetch, int maxPendingMessageCount, bool noLocal,
-                 bool browser, bool dispatchAsync,
-                 cms::MessageListener* listener ) :
-            ActiveMQConsumer( session, id, destination, name, selector, prefetch,
-                              maxPendingMessageCount, noLocal, browser, dispatchAsync,
-                              listener ), parent( parent ) {
+        Browser(ActiveMQQueueBrowser* parent, ActiveMQSession* session,
+                const Pointer<commands::ConsumerId>& id,
+                const Pointer<commands::ActiveMQDestination>& destination,
+                const std::string& name, const std::string& selector,
+                int prefetch, int maxPendingMessageCount, bool noLocal,
+                bool browser, bool dispatchAsync,
+                cms::MessageListener* listener ) :
+            ActiveMQConsumerKernel(session, id, destination, name, selector, prefetch,
+                                   maxPendingMessageCount, noLocal, browser, dispatchAsync,
+                                   listener ), parent(parent) {
 
         }
 
-        virtual void dispatch( const Pointer<MessageDispatch>& dispatched ) {
+        virtual void dispatch(const Pointer<MessageDispatch>& dispatched) {
 
             try{
 
-                if( dispatched->getMessage() == NULL ) {
-                    this->parent->browseDone.set( true );
+                if (dispatched->getMessage() == NULL) {
+                    this->parent->browseDone.set(true);
                 } else {
-                    ActiveMQConsumer::dispatch( dispatched );
+                    ActiveMQConsumerKernel::dispatch(dispatched);
                 }
 
                 this->parent->notifyMessageAvailable();
@@ -89,48 +90,39 @@ namespace core{
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQQueueBrowser::ActiveMQQueueBrowser( ActiveMQSession* session,
-                                            const Pointer<commands::ConsumerId>& consumerId,
-                                            const Pointer<commands::ActiveMQDestination>& destination,
-                                            const std::string& selector,
-                                            bool dispatchAsync ) : cms::QueueBrowser(),
-                                                                   cms::MessageEnumeration(),
-                                                                   session(NULL),
-                                                                   consumerId(),
-                                                                   destination(),
-                                                                   selector(),
-                                                                   dispatchAsync(false),
-                                                                   queue(NULL),
-                                                                   closed(false),
-                                                                   mutex(),
-                                                                   wait(),
-                                                                   browseDone(),
-                                                                   browser(NULL) {
-
-    if( session == NULL ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__, "Session instance provided was NULL." );
-    }
-
-    if( consumerId == NULL ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__, "ConsumerId instance provided was NULL." );
-    }
-
-    if( destination == NULL || !destination->isQueue() ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__, "Destination instance provided was NULL or not a Queue." );
+ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSession* session,
+                                           const Pointer<commands::ConsumerId>& consumerId,
+                                           const Pointer<commands::ActiveMQDestination>& destination,
+                                           const std::string& selector,
+                                           bool dispatchAsync ) : cms::QueueBrowser(),
+                                                                  cms::MessageEnumeration(),
+                                                                  session(session),
+                                                                  consumerId(consumerId),
+                                                                  destination(destination),
+                                                                  selector(selector),
+                                                                  dispatchAsync(dispatchAsync),
+                                                                  queue(NULL),
+                                                                  closed(false),
+                                                                  mutex(),
+                                                                  wait(),
+                                                                  browseDone(),
+                                                                  browser(NULL) {
+
+    if (session == NULL) {
+        throw ActiveMQException(__FILE__, __LINE__, "Session instance provided was NULL.");
+    }
+
+    if (consumerId == NULL) {
+        throw ActiveMQException(__FILE__, __LINE__, "ConsumerId instance provided was NULL.");
+    }
+
+    if (destination == NULL || !destination->isQueue()) {
+        throw ActiveMQException(__FILE__, __LINE__, "Destination instance provided was NULL or not a Queue.");
     }
 
     // Cache the Queue instance for faster retreival.
     this->queue = destination.dynamicCast<cms::Queue>().get();
-    this->consumerId = consumerId;
-    this->selector = selector;
-    this->dispatchAsync = dispatchAsync;
-    this->session = session;
-    this->destination = destination;
     this->closed = false;
-    this->browser = NULL;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -168,11 +160,11 @@ cms::MessageEnumeration* ActiveMQQueueBr
 void ActiveMQQueueBrowser::close() {
     try{
 
-        if( this->closed ) {
+        if (this->closed) {
             return;
         }
 
-        synchronized( &mutex ) {
+        synchronized(&mutex) {
             destroyConsumer();
             this->closed = true;
         }
@@ -185,19 +177,19 @@ bool ActiveMQQueueBrowser::hasMoreMessag
 
     try{
 
-        while( true ) {
+        while (true) {
 
-            synchronized( &mutex ) {
-                if( this->browser == NULL ) {
+            synchronized(&mutex) {
+                if (this->browser == NULL) {
                     return false;
                 }
             }
 
-            if( this->browser->getMessageAvailableCount() > 0 ) {
+            if (this->browser->getMessageAvailableCount() > 0) {
                 return true;
             }
 
-            if( browseDone.get() || !this->session->isStarted() ) {
+            if (browseDone.get() || !this->session->isStarted()) {
                 destroyConsumer();
                 return false;
             }
@@ -213,10 +205,10 @@ cms::Message* ActiveMQQueueBrowser::next
 
     try{
 
-        while( true ) {
+        while (true) {
 
-            synchronized( &mutex ) {
-                if( this->browser == NULL ) {
+            synchronized(&mutex) {
+                if (this->browser == NULL) {
                     return NULL;
                 }
             }
@@ -224,15 +216,15 @@ cms::Message* ActiveMQQueueBrowser::next
             try {
 
                 cms::Message* answer = this->browser->receiveNoWait();
-                if( answer != NULL ) {
+                if (answer != NULL) {
                     return answer;
                 }
 
-            } catch( cms::CMSException& e ) {
+            } catch(cms::CMSException& e) {
                 return NULL;
             }
 
-            if( this->browseDone.get() || !this->session->isStarted() ) {
+            if (this->browseDone.get() || !this->session->isStarted()) {
                 destroyConsumer();
                 return NULL;
             }
@@ -246,68 +238,66 @@ cms::Message* ActiveMQQueueBrowser::next
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQQueueBrowser::notifyMessageAvailable() {
 
-    synchronized( &wait ) {
+    synchronized(&wait) {
         wait.notifyAll();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQQueueBrowser::waitForMessageAvailable() {
-    synchronized( &wait ) {
-        wait.wait( 2000 );
+    synchronized(&wait) {
+        wait.wait(2000);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer* ActiveMQQueueBrowser::createConsumer() {
+Pointer<ActiveMQConsumerKernel> ActiveMQQueueBrowser::createConsumer() {
 
-    this->browseDone.set( false );
+    this->browseDone.set(false);
 
     int prefetch = this->session->getConnection()->getPrefetchPolicy()->getQueueBrowserPrefetch();
 
-    std::auto_ptr<ActiveMQConsumer> consumer(
+    Pointer<ActiveMQConsumerKernel> consumer(
         new Browser( this, session, consumerId, destination, "", selector,
                      prefetch, 0, false, true, dispatchAsync, NULL ) );
 
-    try{
-        this->session->addConsumer( consumer.get() );
-        this->session->syncRequest( consumer->getConsumerInfo() );
-    } catch( Exception& ex ) {
-        this->session->removeConsumer( consumer->getConsumerId() );
+    try {
+        this->session->addConsumer(consumer);
+        this->session->syncRequest(consumer->getConsumerInfo());
+    } catch (Exception& ex) {
+        this->session->removeConsumer(consumer->getConsumerId());
         throw ex;
     }
 
-    if( this->session->getConnection()->isStarted() ) {
+    if (this->session->getConnection()->isStarted()) {
         consumer->start();
     }
 
-    return consumer.release();
+    return consumer;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQQueueBrowser::destroyConsumer() {
 
-    if( this->browser == NULL ) {
+    if (this->browser == NULL) {
         return;
     }
 
     try {
 
-        if( this->session->isTransacted() ) {
+        if (this->session->isTransacted()) {
             session->commit();
         }
 
         this->browser->close();
-        delete this->browser;
-        this->browser = NULL ;
+        this->browser.reset(NULL);
     }
     AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQQueueBrowser::checkClosed() {
-    if( closed ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__, "The QueueBrowser is closed." );
+    if (closed) {
+        throw ActiveMQException(__FILE__, __LINE__, "The QueueBrowser is closed.");
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h Mon Mar 26 21:11:12 2012
@@ -33,8 +33,10 @@
 
 namespace activemq {
 namespace core {
+namespace kernels {
+    class ActiveMQConsumerKernel;
+}
 
-    class ActiveMQConsumer;
     class ActiveMQSession;
     class Browser;
 
@@ -58,20 +60,20 @@ namespace core {
         mutable decaf::util::concurrent::Mutex wait;
         decaf::util::concurrent::atomic::AtomicBoolean browseDone;
 
-        mutable ActiveMQConsumer* browser;
+        mutable Pointer<activemq::core::kernels::ActiveMQConsumerKernel> browser;
 
     private:
 
-        ActiveMQQueueBrowser( const ActiveMQQueueBrowser& );
-        ActiveMQQueueBrowser& operator= ( const ActiveMQQueueBrowser& );
+        ActiveMQQueueBrowser(const ActiveMQQueueBrowser&);
+        ActiveMQQueueBrowser& operator=(const ActiveMQQueueBrowser&);
 
     public:
 
-        ActiveMQQueueBrowser( ActiveMQSession* session,
-                              const Pointer<commands::ConsumerId>& consumerId,
-                              const Pointer<commands::ActiveMQDestination>& destination,
-                              const std::string& selector,
-                              bool dispatchAsync );
+        ActiveMQQueueBrowser(ActiveMQSession* session,
+                             const Pointer<commands::ConsumerId>& consumerId,
+                             const Pointer<commands::ActiveMQDestination>& destination,
+                             const std::string& selector,
+                             bool dispatchAsync);
 
         virtual ~ActiveMQQueueBrowser() throw();
 
@@ -95,7 +97,7 @@ namespace core {
         void notifyMessageAvailable();
         void waitForMessageAvailable();
 
-        ActiveMQConsumer* createConsumer();
+        Pointer<activemq::core::kernels::ActiveMQConsumerKernel> createConsumer();
         void destroyConsumer();
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Mon Mar 26 21:11:12 2012
@@ -61,6 +61,7 @@ using namespace std;
 using namespace activemq;
 using namespace activemq::util;
 using namespace activemq::core;
+using namespace activemq::core::kernels;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace activemq::threads;
@@ -79,23 +80,21 @@ namespace {
     class ClearConsumerTask : public Runnable {
     private:
 
-        ActiveMQConsumer* consumer;
+        Pointer<ActiveMQConsumerKernel> consumer;
 
     private:
 
-        ClearConsumerTask( const ClearConsumerTask& );
-        ClearConsumerTask& operator= ( const ClearConsumerTask& );
+        ClearConsumerTask(const ClearConsumerTask&);
+        ClearConsumerTask& operator=(const ClearConsumerTask&);
 
     public:
 
-        ClearConsumerTask( ActiveMQConsumer* consumer ) : Runnable(), consumer(NULL) {
+        ClearConsumerTask(Pointer<ActiveMQConsumerKernel> consumer) : Runnable(), consumer(consumer) {
 
-            if( consumer == NULL ) {
+            if (consumer == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
             }
-
-            this->consumer = consumer;
         }
 
         virtual ~ClearConsumerTask() {}
@@ -117,19 +116,17 @@ namespace {
 
     private:
 
-        CloseSynhcronization( const CloseSynhcronization& );
-        CloseSynhcronization& operator= ( const CloseSynhcronization& );
+        CloseSynhcronization(const CloseSynhcronization&);
+        CloseSynhcronization& operator=(const CloseSynhcronization&);
 
     public:
 
-        CloseSynhcronization( ActiveMQSession* session ) : Synchronization(), session(NULL) {
+        CloseSynhcronization(ActiveMQSession* session) : Synchronization(), session(session) {
 
-            if( session == NULL ) {
+            if(session == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Synchronization Created with NULL Session.");
             }
-
-            this->session = session;
         }
 
         virtual ~CloseSynhcronization() {}
@@ -144,7 +141,6 @@ namespace {
         virtual void afterRollback() {
             session->doClose();
         }
-
     };
 }
 
@@ -156,79 +152,73 @@ namespace core{
     public:
 
         typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
-                                     ActiveMQConsumer*,
+                                     Pointer<ActiveMQConsumerKernel>,
                                      commands::ConsumerId::COMPARATOR> ConsumersMap;
 
     private:
 
-        SessionConfig( const SessionConfig& );
-        SessionConfig& operator= ( const SessionConfig& );
+        SessionConfig(const SessionConfig&);
+        SessionConfig& operator=(const SessionConfig&);
 
     public:
 
         bool synchronizationRegistered;
-        decaf::util::concurrent::CopyOnWriteArrayList<ActiveMQProducer*> producers;
+        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
         Pointer<Scheduler> scheduler;
 
     public:
 
-        SessionConfig() : synchronizationRegistered( false ),
-                          producers(),
-                          scheduler()
-        {}
-
+        SessionConfig() : synchronizationRegistered(false), producers(), scheduler() {}
         ~SessionConfig() {}
     };
 
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession( ActiveMQConnection* connection,
-                                  const Pointer<SessionId>& id,
-                                  cms::Session::AcknowledgeMode ackMode,
-                                  const Properties& properties ) : config( new SessionConfig ),
-                                                                   sessionInfo(),
-                                                                   transaction(),
-                                                                   connection(NULL),
-                                                                   consumers(),
-                                                                   closed(false),
-                                                                   executor(),
-                                                                   ackMode(),
-                                                                   producerIds(),
-                                                                   producerSequenceIds(),
-                                                                   consumerIds(),
-                                                                   lastDeliveredSequenceId(0){
+ActiveMQSession::ActiveMQSession(ActiveMQConnection* connection,
+                                 const Pointer<SessionId>& id,
+                                 cms::Session::AcknowledgeMode ackMode,
+                                 const Properties& properties) : config(new SessionConfig),
+                                                                 sessionInfo(),
+                                                                 transaction(),
+                                                                 connection(connection),
+                                                                 consumers(),
+                                                                 closed(false),
+                                                                 executor(),
+                                                                 ackMode(ackMode),
+                                                                 producerIds(),
+                                                                 producerSequenceIds(),
+                                                                 consumerIds(),
+                                                                 lastDeliveredSequenceId(0) {
 
-    if( id == NULL || connection == NULL ) {
+    if (id == NULL || connection == NULL) {
         throw ActiveMQException(
             __FILE__, __LINE__,
             "ActiveMQSession::ActiveMQSession - Constructor called with NULL data");
     }
 
-    this->sessionInfo.reset( new SessionInfo() );
-    this->sessionInfo->setAckMode( ackMode );
-    this->sessionInfo->setSessionId( id );
+    this->sessionInfo.reset(new SessionInfo());
+    this->sessionInfo->setAckMode(ackMode);
+    this->sessionInfo->setSessionId(id);
 
-    connection->oneway( this->sessionInfo );
+    connection->oneway(this->sessionInfo);
 
-    this->connection = connection;
     this->closed = false;
-    this->ackMode = ackMode;
     this->lastDeliveredSequenceId = -1;
 
     // Create a Transaction objet
-    this->transaction.reset( new ActiveMQTransactionContext( this, properties ) );
+    this->transaction.reset(new ActiveMQTransactionContext(this, properties));
 
     // Create the session executor object.
-    this->executor.reset( new ActiveMQSessionExecutor( this ) );
+    this->executor.reset(new ActiveMQSessionExecutor(this));
 
-    this->connection->addSession( this );
+    this->connection->addSession(this);
 
     // Use the Connection's Scheduler.
     this->config->scheduler = this->connection->getScheduler();
 
     // If the connection is already started, start the session.
-    if( this->connection->isStarted() ) {
+    if (this->connection->isStarted()) {
         this->start();
     }
 }
@@ -246,9 +236,9 @@ ActiveMQSession::~ActiveMQSession() thro
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::fire( const activemq::exceptions::ActiveMQException& ex ) {
-    if( connection != NULL ) {
-        connection->fire( ex );
+void ActiveMQSession::fire(const activemq::exceptions::ActiveMQException& ex) {
+    if (connection != NULL) {
+        connection->fire(ex);
     }
 }
 
@@ -287,10 +277,10 @@ void ActiveMQSession::doClose() {
         dispose();
 
         // Remove this session from the Broker.
-        Pointer<RemoveInfo> info( new RemoveInfo() );
-        info->setObjectId( this->sessionInfo->getSessionId() );
-        info->setLastDeliveredSequenceId( this->lastDeliveredSequenceId );
-        this->connection->oneway( info );
+        Pointer<RemoveInfo> info(new RemoveInfo());
+        info->setObjectId(this->sessionInfo->getSessionId());
+        info->setLastDeliveredSequenceId(this->lastDeliveredSequenceId);
+        this->connection->oneway(info);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -332,21 +322,21 @@ void ActiveMQSession::dispose() {
 
         // Roll Back the transaction since we were closed without an explicit call
         // to commit it.
-        if( this->transaction->isInTransaction() ){
+        if (this->transaction->isInTransaction()) {
             this->transaction->rollback();
         }
 
         // Dispose of all Consumers, the dispose method skips the RemoveInfo command.
-        synchronized( &this->consumers ) {
+        synchronized(&this->consumers) {
 
-            std::vector<ActiveMQConsumer*> closables = this->consumers.values();
+            std::vector< Pointer<ActiveMQConsumerKernel> > closables = this->consumers.values();
 
-            for( std::size_t i = 0; i < closables.size(); ++i ) {
+            for (std::size_t i = 0; i < closables.size(); ++i) {
                 try{
                     closables[i]->setFailureError(this->connection->getFirstFailureError());
                     closables[i]->dispose();
                     this->lastDeliveredSequenceId =
-                        Math::max( this->lastDeliveredSequenceId, closables[i]->getLastDeliveredSequenceId() );
+                        Math::max(this->lastDeliveredSequenceId, closables[i]->getLastDeliveredSequenceId());
                 } catch( cms::CMSException& ex ){
                     /* Absorb */
                 }
@@ -354,7 +344,7 @@ void ActiveMQSession::dispose() {
         }
 
         // Dispose of all Producers, the dispose method skips the RemoveInfo command.
-        std::auto_ptr< Iterator<ActiveMQProducer*> > producerIter( this->config->producers.iterator() );
+        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
 
         while( producerIter->hasNext() ) {
             try{
@@ -419,9 +409,9 @@ void ActiveMQSession::recover() {
         }
 
         synchronized( &this->consumers ) {
-            std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+            std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
 
-            std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+            std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
             for( ; iter != consumers.end(); ++iter ) {
                 (*iter)->rollback();
             }
@@ -438,9 +428,9 @@ void ActiveMQSession::clearMessagesInPro
     }
 
     synchronized( &this->consumers ) {
-        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
 
-        std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
         for( ; iter != consumers.end(); ++iter ) {
             (*iter)->inProgressClearRequired();
 
@@ -454,9 +444,9 @@ void ActiveMQSession::clearMessagesInPro
 void ActiveMQSession::acknowledge() {
 
     synchronized( &this->consumers ) {
-        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
 
-        std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
         for( ; iter != consumers.end(); ++iter ) {
             (*iter)->acknowledge();
         }
@@ -467,9 +457,9 @@ void ActiveMQSession::acknowledge() {
 void ActiveMQSession::deliverAcks() {
 
     synchronized( &this->consumers ) {
-        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
 
-        std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
         for( ; iter != consumers.end(); ++iter ) {
             (*iter)->deliverAcks();
         }
@@ -480,10 +470,8 @@ void ActiveMQSession::deliverAcks() {
 cms::MessageConsumer* ActiveMQSession::createConsumer( const cms::Destination* destination ) {
 
     try{
-
         this->checkClosed();
-
-        return this->createConsumer( destination, "", false );
+        return this->createConsumer(destination, "", false);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -493,10 +481,8 @@ cms::MessageConsumer* ActiveMQSession::c
                                                        const std::string& selector ) {
 
     try{
-
         this->checkClosed();
-
-        return this->createConsumer( destination, selector, false );
+        return this->createConsumer(destination, selector, false);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -531,24 +517,24 @@ cms::MessageConsumer* ActiveMQSession::c
         }
 
         // Create the consumer instance.
-        std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( this, this->getNextConsumerId(),
-                                  dest, "", selector, prefetch, 0, noLocal,
-                                  false, this->connection->isDispatchAsync(), NULL ) );
+        Pointer<ActiveMQConsumerKernel> consumer(
+            new ActiveMQConsumerKernel(this, this->getNextConsumerId(),
+                                       dest, "", selector, prefetch, 0, noLocal,
+                                       false, this->connection->isDispatchAsync(), NULL));
 
         try{
-            this->addConsumer( consumer.get() );
-            this->connection->syncRequest( consumer->getConsumerInfo() );
-        } catch( Exception& ex ) {
-            this->removeConsumer( consumer->getConsumerId() );
+            this->addConsumer(consumer);
+            this->connection->syncRequest(consumer->getConsumerInfo());
+        } catch (Exception& ex) {
+            this->removeConsumer(consumer->getConsumerId());
             throw ex;
         }
 
-        if( this->connection->isStarted() ) {
+        if (this->connection->isStarted()) {
             consumer->start();
         }
 
-        return consumer.release();
+        return new ActiveMQConsumer(consumer);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -577,25 +563,25 @@ cms::MessageConsumer* ActiveMQSession::c
         Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
 
         // Create the consumer instance.
-        std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( this, this->getNextConsumerId(),
-                                  dest, name, selector,
-                                  this->connection->getPrefetchPolicy()->getDurableTopicPrefetch(),
-                                  0, noLocal, false, this->connection->isDispatchAsync(), NULL ) );
-
-        try{
-            this->addConsumer( consumer.get() );
-            this->connection->syncRequest( consumer->getConsumerInfo() );
-        } catch( Exception& ex ) {
-            this->removeConsumer( consumer->getConsumerId() );
+        Pointer<ActiveMQConsumerKernel> consumer(
+            new ActiveMQConsumerKernel(this, this->getNextConsumerId(),
+                                       dest, name, selector,
+                                       this->connection->getPrefetchPolicy()->getDurableTopicPrefetch(),
+                                       0, noLocal, false, this->connection->isDispatchAsync(), NULL));
+
+        try {
+            this->addConsumer(consumer);
+            this->connection->syncRequest(consumer->getConsumerInfo());
+        } catch (Exception& ex) {
+            this->removeConsumer(consumer->getConsumerId());
             throw ex;
         }
 
-        if( this->connection->isStarted() ) {
+        if (this->connection->isStarted()) {
             consumer->start();
         }
 
-        return consumer.release();
+        return new ActiveMQConsumer(consumer);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -611,12 +597,12 @@ cms::MessageProducer* ActiveMQSession::c
 
         // Producers are allowed to have NULL destinations.  In this case, the
         // destination is specified by the messages as they are sent.
-        if( destination != NULL ) {
+        if (destination != NULL) {
 
             const ActiveMQDestination* amqDestination =
-                dynamic_cast<const ActiveMQDestination*>( destination );
+                dynamic_cast<const ActiveMQDestination*> (destination);
 
-            if( amqDestination == NULL ) {
+            if (amqDestination == NULL) {
                 throw ActiveMQException(
                     __FILE__, __LINE__,
                     "Destination was either NULL or not created by this CMS Client" );
@@ -624,22 +610,22 @@ cms::MessageProducer* ActiveMQSession::c
 
             // Cast the destination to an OpenWire destination, so we can
             // get all the goodies.
-            dest.reset( amqDestination->cloneDataStructure() );
+            dest.reset(amqDestination->cloneDataStructure());
         }
 
         // Create the producer instance.
-        std::auto_ptr<ActiveMQProducer> producer( new ActiveMQProducer(
+        Pointer<ActiveMQProducerKernel> producer( new ActiveMQProducerKernel(
             this, this->getNextProducerId(), dest, this->connection->getSendTimeout() ) );
 
         try{
-            this->addProducer( producer.get() );
-            this->connection->oneway( producer->getProducerInfo() );
-        } catch( Exception& ex ) {
-            this->removeProducer( producer.get() );
+            this->addProducer(producer);
+            this->connection->oneway(producer->getProducerInfo());
+        } catch (Exception& ex) {
+            this->removeProducer(producer->getProducerId());
             throw ex;
         }
 
-        return producer.release();
+        return new ActiveMQProducer(producer);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -648,14 +634,14 @@ cms::MessageProducer* ActiveMQSession::c
 cms::QueueBrowser* ActiveMQSession::createBrowser( const cms::Queue* queue ) {
 
     try{
-        return ActiveMQSession::createBrowser( queue, "" );
+        return ActiveMQSession::createBrowser(queue, "");
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSession::createBrowser( const cms::Queue* queue,
-                                                   const std::string& selector ) {
+cms::QueueBrowser* ActiveMQSession::createBrowser(const cms::Queue* queue,
+                                                  const std::string& selector) {
 
     try{
 
@@ -664,20 +650,20 @@ cms::QueueBrowser* ActiveMQSession::crea
         // Cast the destination to an OpenWire destination, so we can
         // get all the goodies.
         const ActiveMQDestination* amqDestination =
-            dynamic_cast<const ActiveMQDestination*>( queue );
+            dynamic_cast<const ActiveMQDestination*> (queue);
 
-        if( amqDestination == NULL ) {
+        if (amqDestination == NULL) {
             throw ActiveMQException(
                 __FILE__, __LINE__,
                 "Destination was either NULL or not created by this CMS Client" );
         }
 
-        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
+        Pointer<ActiveMQDestination> dest(amqDestination->cloneDataStructure());
 
         // Create the QueueBrowser instance
         std::auto_ptr<ActiveMQQueueBrowser> browser(
-            new ActiveMQQueueBrowser( this, this->getNextConsumerId(), dest,
-                                      selector, this->connection->isDispatchAsync() ) );
+            new ActiveMQQueueBrowser(this, this->getNextConsumerId(), dest,
+                                     selector, this->connection->isDispatchAsync()));
 
         return browser.release();
     }
@@ -691,12 +677,12 @@ cms::Queue* ActiveMQSession::createQueue
 
         this->checkClosed();
 
-        if( queueName == "" ) {
+        if (queueName == "") {
             throw IllegalArgumentException(
                 __FILE__, __LINE__, "Destination Name cannot be the Empty String." );
         }
 
-        return new commands::ActiveMQQueue( queueName );
+        return new commands::ActiveMQQueue(queueName);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -708,12 +694,12 @@ cms::Topic* ActiveMQSession::createTopic
 
         this->checkClosed();
 
-        if( topicName == "" ) {
+        if (topicName == "") {
             throw IllegalArgumentException(
                 __FILE__, __LINE__, "Destination Name cannot be the Empty String." );
         }
 
-        return new commands::ActiveMQTopic( topicName );
+        return new commands::ActiveMQTopic(topicName);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -725,11 +711,11 @@ cms::TemporaryQueue* ActiveMQSession::cr
 
         this->checkClosed();
 
-        std::auto_ptr<commands::ActiveMQTempQueue> queue( new
-            commands::ActiveMQTempQueue( this->createTemporaryDestinationName() ) );
+        std::auto_ptr<commands::ActiveMQTempQueue> queue(new
+            commands::ActiveMQTempQueue(this->createTemporaryDestinationName()));
 
         // Register it with the Broker
-        this->createTemporaryDestination( queue.get() );
+        this->createTemporaryDestination(queue.get());
 
         return queue.release();
     }
@@ -743,11 +729,11 @@ cms::TemporaryTopic* ActiveMQSession::cr
 
         this->checkClosed();
 
-        std::auto_ptr<commands::ActiveMQTempTopic> topic( new
-            commands::ActiveMQTempTopic( createTemporaryDestinationName() ) );
+        std::auto_ptr<commands::ActiveMQTempTopic> topic(new
+            commands::ActiveMQTempTopic(createTemporaryDestinationName()));
 
         // Register it with the Broker
-        this->createTemporaryDestination( topic.get() );
+        this->createTemporaryDestination(topic.get());
 
         return topic.release();
     }
@@ -761,7 +747,7 @@ cms::Message* ActiveMQSession::createMes
 
         this->checkClosed();
         commands::ActiveMQMessage* message = new commands::ActiveMQMessage();
-        message->setConnection( this->connection );
+        message->setConnection(this->connection);
         return message;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -774,7 +760,7 @@ cms::BytesMessage* ActiveMQSession::crea
 
         this->checkClosed();
         commands::ActiveMQBytesMessage* message = new commands::ActiveMQBytesMessage();
-        message->setConnection( this->connection );
+        message->setConnection(this->connection);
         return message;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -787,7 +773,7 @@ cms::BytesMessage* ActiveMQSession::crea
 
         this->checkClosed();
         cms::BytesMessage* msg = createBytesMessage();
-        msg->setBodyBytes( bytes, bytesSize );
+        msg->setBodyBytes(bytes, bytesSize);
         return msg;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -800,7 +786,7 @@ cms::StreamMessage* ActiveMQSession::cre
 
         this->checkClosed();
         commands::ActiveMQStreamMessage* message = new commands::ActiveMQStreamMessage();
-        message->setConnection( this->connection );
+        message->setConnection(this->connection);
         return message;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -813,7 +799,7 @@ cms::TextMessage* ActiveMQSession::creat
 
         this->checkClosed();
         commands::ActiveMQTextMessage* message = new commands::ActiveMQTextMessage();
-        message->setConnection( this->connection );
+        message->setConnection(this->connection);
         return message;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -826,7 +812,7 @@ cms::TextMessage* ActiveMQSession::creat
 
         this->checkClosed();
         cms::TextMessage* msg = createTextMessage();
-        msg->setText( text.c_str() );
+        msg->setText(text.c_str());
         return msg;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -839,7 +825,7 @@ cms::MapMessage* ActiveMQSession::create
 
         this->checkClosed();
         commands::ActiveMQMapMessage* message = new commands::ActiveMQMapMessage();
-        message->setConnection( this->connection );
+        message->setConnection(this->connection);
         return message;
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -852,43 +838,40 @@ cms::Session::AcknowledgeMode ActiveMQSe
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSession::isTransacted() const {
-    return ( this->ackMode == Session::SESSION_TRANSACTED ) || this->transaction->isInXATransaction();
+    return (this->ackMode == Session::SESSION_TRANSACTED) || this->transaction->isInXATransaction();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::send( cms::Message* message, ActiveMQProducer* producer, util::Usage* usage ) {
+void ActiveMQSession::send(cms::Message* message, ActiveMQProducerKernel* producer, util::Usage* usage) {
 
     try {
 
         this->checkClosed();
 
-        commands::Message* amqMessage =
-            dynamic_cast< commands::Message* >( message );
+        commands::Message* amqMessage = dynamic_cast< commands::Message* >(message);
 
-        if( amqMessage == NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::send - "
-                "Message is not a valid Open Wire type.");
+        if (amqMessage == NULL) {
+            throw ActiveMQException(__FILE__, __LINE__,
+                "ActiveMQSession::send - Message is not a valid Open Wire type.");
         }
 
         // Clear any old data that might be in the message object
-        amqMessage->getMessageId().reset( NULL );
-        amqMessage->getProducerId().reset( NULL );
-        amqMessage->getTransactionId().reset( NULL );
+        amqMessage->getMessageId().reset(NULL);
+        amqMessage->getProducerId().reset(NULL);
+        amqMessage->getTransactionId().reset(NULL);
 
         // Always assign the message ID, regardless of the disable
         // flag.  Not adding a message ID will cause an NPE at the broker.
-        decaf::lang::Pointer<commands::MessageId> id( new commands::MessageId() );
-        id->setProducerId( producer->getProducerInfo()->getProducerId() );
-        id->setProducerSequenceId( this->getNextProducerSequenceId() );
+        decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
+        id->setProducerId(producer->getProducerInfo()->getProducerId());
+        id->setProducerSequenceId(this->getNextProducerSequenceId());
 
-        amqMessage->setMessageId( id );
+        amqMessage->setMessageId(id);
 
         // Ensure that a new transaction is started if this is the first message
         // sent since the last commit.
         doStartTransaction();
-        amqMessage->setTransactionId( this->transaction->getTransactionId() );
+        amqMessage->setTransactionId(this->transaction->getTransactionId());
 
         // NOTE:
         // Now we copy the message before sending, this allows the user to reuse the
@@ -897,28 +880,28 @@ void ActiveMQSession::send( cms::Message
         // To not do this implies that the user must never reuse the message object, or
         // know that the configuration of Transports doesn't involve the message hanging
         // around beyond the point that send returns.
-        Pointer<commands::Message> msgCopy( amqMessage->cloneDataStructure() );
+        Pointer<commands::Message> msgCopy(amqMessage->cloneDataStructure());
 
         msgCopy->onSend();
         msgCopy->setProducerId( producer->getProducerInfo()->getProducerId() );
 
-        if( this->connection->getSendTimeout() <= 0 &&
+        if (this->connection->getSendTimeout() <= 0 &&
             !msgCopy->isResponseRequired() &&
             !this->connection->isAlwaysSyncSend() &&
-            ( !msgCopy->isPersistent() || this->connection->isUseAsyncSend() ||
-                msgCopy->getTransactionId() != NULL ) ) {
+            (!msgCopy->isPersistent() || this->connection->isUseAsyncSend() ||
+               msgCopy->getTransactionId() != NULL)) {
 
-            if( usage != NULL ) {
-                usage->enqueueUsage( msgCopy->getSize() );
+            if (usage != NULL) {
+                usage->enqueueUsage(msgCopy->getSize());
             }
 
             // No Response Required.
-            this->connection->oneway( msgCopy );
+            this->connection->oneway(msgCopy);
 
         } else {
 
             // Send the message to the broker.
-            this->connection->syncRequest( msgCopy, this->connection->getSendTimeout() );
+            this->connection->syncRequest(msgCopy, this->connection->getSendTimeout());
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -940,39 +923,39 @@ Pointer<Scheduler> ActiveMQSession::getS
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::unsubscribe( const std::string& name ) {
+void ActiveMQSession::unsubscribe(const std::string& name) {
 
     try{
 
         this->checkClosed();
 
-        Pointer<RemoveSubscriptionInfo> rsi( new RemoveSubscriptionInfo() );
+        Pointer<RemoveSubscriptionInfo> rsi(new RemoveSubscriptionInfo());
 
-        rsi->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        rsi->setSubcriptionName( name );
-        rsi->setClientId( this->connection->getConnectionInfo().getClientId() );
+        rsi->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        rsi->setSubcriptionName(name);
+        rsi->setClientId(this->connection->getConnectionInfo().getClientId());
 
         // Send the message to the broker.
-        this->connection->syncRequest( rsi );
+        this->connection->syncRequest(rsi);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::dispatch( const Pointer<MessageDispatch>& dispatch ) {
+void ActiveMQSession::dispatch(const Pointer<MessageDispatch>& dispatch) {
 
-    if( this->executor.get() != NULL ) {
+    if (this->executor.get() != NULL) {
         this->executor->execute( dispatch );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::redispatch( MessageDispatchChannel& unconsumedMessages ) {
+void ActiveMQSession::redispatch(MessageDispatchChannel& unconsumedMessages) {
 
     std::vector< Pointer<MessageDispatch> > messages = unconsumedMessages.removeAll();
     std::vector< Pointer<MessageDispatch> >::reverse_iterator iter = messages.rbegin();
 
-    for( ; iter != messages.rend(); ++iter ) {
+    for (; iter != messages.rend(); ++iter) {
         executor->executeFirst( *iter );
     }
 }
@@ -980,16 +963,16 @@ void ActiveMQSession::redispatch( Messag
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::start() {
 
-    synchronized( &this->consumers ) {
-        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+    synchronized(&this->consumers) {
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
 
-        std::vector< ActiveMQConsumer*>::iterator iter = consumers.begin();
-        for( ; iter != consumers.end(); ++iter ) {
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
+        for (; iter != consumers.end(); ++iter) {
             (*iter)->start();
         }
     }
 
-    if( this->executor.get() != NULL ) {
+    if (this->executor.get() != NULL) {
         this->executor->start();
     }
 }
@@ -997,7 +980,7 @@ void ActiveMQSession::start() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::stop() {
 
-    if( this->executor.get() != NULL ) {
+    if (this->executor.get() != NULL) {
         this->executor->stop();
     }
 }
@@ -1005,7 +988,7 @@ void ActiveMQSession::stop() {
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSession::isStarted() const {
 
-    if( this->executor.get() == NULL ) {
+    if (this->executor.get() == NULL) {
         return false;
     }
 
@@ -1013,22 +996,20 @@ bool ActiveMQSession::isStarted() const 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::createTemporaryDestination(
-    commands::ActiveMQTempDestination* tempDestination ) {
+void ActiveMQSession::createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination) {
 
     try {
 
-        Pointer<DestinationInfo> command( new DestinationInfo() );
-        command->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        command->setOperationType( ActiveMQConstants::DESTINATION_ADD_OPERATION );
-        command->setDestination(
-            Pointer<ActiveMQTempDestination>( tempDestination->cloneDataStructure() ) );
+        Pointer<DestinationInfo> command(new DestinationInfo());
+        command->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        command->setOperationType(ActiveMQConstants::DESTINATION_ADD_OPERATION);
+        command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure()));
 
         // Send the message to the broker.
-        this->syncRequest( command );
+        this->syncRequest(command);
 
         // Now that its setup, link it to this Connection so it can be closed.
-        tempDestination->setConnection( this->connection );
+        tempDestination->setConnection(this->connection);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1037,19 +1018,18 @@ void ActiveMQSession::createTemporaryDes
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::destroyTemporaryDestination(
-    commands::ActiveMQTempDestination* tempDestination ) {
+    commands::ActiveMQTempDestination* tempDestination) {
 
     try {
 
-        Pointer<DestinationInfo> command( new DestinationInfo() );
+        Pointer<DestinationInfo> command(new DestinationInfo());
 
-        command->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        command->setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
-        command->setDestination(
-            Pointer<ActiveMQTempDestination>( tempDestination->cloneDataStructure() ) );
+        command->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
+        command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure()));
 
         // Send the message to the broker.
-        this->connection->syncRequest( command );
+        this->connection->syncRequest(command);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1061,7 +1041,7 @@ std::string ActiveMQSession::createTempo
 
     try {
         return this->connection->getConnectionId().getValue() + ":" +
-               Long::toString( this->connection->getNextTempDestinationId() );
+               Long::toString(this->connection->getNextTempDestinationId());
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1069,11 +1049,11 @@ std::string ActiveMQSession::createTempo
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::oneway( Pointer<Command> command ) {
+void ActiveMQSession::oneway(Pointer<Command> command) {
 
     try{
         this->checkClosed();
-        this->connection->oneway( command );
+        this->connection->oneway(command);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1081,11 +1061,11 @@ void ActiveMQSession::oneway( Pointer<Co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ActiveMQSession::syncRequest( Pointer<Command> command, unsigned int timeout ) {
+Pointer<Response> ActiveMQSession::syncRequest(Pointer<Command> command, unsigned int timeout) {
 
     try{
         this->checkClosed();
-        return this->connection->syncRequest( command, timeout );
+        return this->connection->syncRequest(command, timeout);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1102,19 +1082,19 @@ void ActiveMQSession::checkClosed() cons
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::addConsumer( ActiveMQConsumer* consumer ) {
+void ActiveMQSession::addConsumer(Pointer<ActiveMQConsumerKernel> consumer) {
 
     try{
 
         this->checkClosed();
 
         // Add the consumer to the map.
-        synchronized( &this->consumers ) {
-            this->consumers.put( consumer->getConsumerInfo()->getConsumerId(), consumer );
+        synchronized(&this->consumers) {
+            this->consumers.put(consumer->getConsumerInfo()->getConsumerId(), consumer);
         }
 
         // Register this as a message dispatcher for the consumer.
-        this->connection->addDispatcher( consumer->getConsumerInfo()->getConsumerId(), this );
+        this->connection->addDispatcher(consumer->getConsumerInfo()->getConsumerId(), this);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1122,20 +1102,18 @@ void ActiveMQSession::addConsumer( Activ
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::removeConsumer( const Pointer<ConsumerId>& consumerId ) {
+void ActiveMQSession::removeConsumer(const Pointer<ConsumerId>& consumerId) {
 
     try{
 
         this->checkClosed();
 
-        synchronized( &this->consumers ) {
-
-            if( this->consumers.containsKey( consumerId ) ) {
-
-                // Remove this Id both from the Sessions Map of Consumers and from
-                // the Connection.
-                this->connection->removeDispatcher( consumerId );
-                this->consumers.remove( consumerId );
+        synchronized(&this->consumers) {
+            if (this->consumers.containsKey(consumerId)) {
+                // Remove this Id both from the Sessions Map of Consumers and from the Connection.
+                // If the kernels parent is destroyed then it will get cleaned up now.
+                this->connection->removeDispatcher(consumerId);
+                this->consumers.remove(consumerId);
             }
         }
     }
@@ -1145,16 +1123,16 @@ void ActiveMQSession::removeConsumer( co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::addProducer( ActiveMQProducer* producer ) {
+void ActiveMQSession::addProducer(Pointer<ActiveMQProducerKernel> producer) {
 
     try{
 
         this->checkClosed();
 
-        this->config->producers.add( producer );
+        this->config->producers.add(producer);
 
         // Add to the Connections list
-        this->connection->addProducer( producer );
+        this->connection->addProducer(producer);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1162,14 +1140,28 @@ void ActiveMQSession::addProducer( Activ
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::removeProducer( ActiveMQProducer* producer ) {
+void ActiveMQSession::removeProducer(const Pointer<commands::ProducerId>& producerId) {
 
     try{
 
         this->checkClosed();
 
-        this->connection->removeProducer( producer->getProducerId() );
-        this->config->producers.remove( producer );
+        this->connection->removeProducer(producerId);
+
+        std::auto_ptr<Iterator<Pointer< ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+
+        Pointer<ActiveMQProducerKernel> toRemove;
+        while (producerIter->hasNext()) {
+            Pointer<ActiveMQProducerKernel> temp = producerIter->next();
+            if (temp->getProducerId()->equals(*producerId)) {
+                toRemove = temp;
+                break;
+            }
+        }
+
+        if (toRemove != NULL) {
+            this->config->producers.remove(toRemove);
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -1179,7 +1171,7 @@ void ActiveMQSession::removeProducer( Ac
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::doStartTransaction() {
 
-    if( this->isTransacted() && !this->transaction->isInXATransaction() ) {
+    if (this->isTransacted() && !this->transaction->isInXATransaction()) {
         this->transaction->begin();
     }
 }
@@ -1187,31 +1179,29 @@ void ActiveMQSession::doStartTransaction
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::wakeup() {
 
-    if( this->executor.get() != NULL ) {
+    if (this->executor.get() != NULL) {
         this->executor->wakeup();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<commands::ConsumerId> ActiveMQSession::getNextConsumerId() {
-    Pointer<ConsumerId> consumerId( new commands::ConsumerId() );
+    Pointer<ConsumerId> consumerId(new commands::ConsumerId());
 
-    consumerId->setConnectionId(
-        this->connection->getConnectionId().getValue() );
-    consumerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
-    consumerId->setValue( this->consumerIds.getNextSequenceId() );
+    consumerId->setConnectionId(this->connection->getConnectionId().getValue());
+    consumerId->setSessionId(this->sessionInfo->getSessionId()->getValue());
+    consumerId->setValue(this->consumerIds.getNextSequenceId());
 
     return consumerId;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<commands::ProducerId> ActiveMQSession::getNextProducerId() {
-    Pointer<ProducerId> producerId( new ProducerId() );
+    Pointer<ProducerId> producerId(new ProducerId());
 
-    producerId->setConnectionId(
-        this->connection->getConnectionId().getValue() );
-    producerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
-    producerId->setValue( this->producerIds.getNextSequenceId() );
+    producerId->setConnectionId(this->connection->getConnectionId().getValue());
+    producerId->setSessionId(this->sessionInfo->getSessionId()->getValue());
+    producerId->setValue(this->producerIds.getNextSequenceId());
 
     return producerId;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Mon Mar 26 21:11:12 2012
@@ -24,6 +24,8 @@
 #include <activemq/util/Usage.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
+#include <activemq/core/kernels/ActiveMQProducerKernel.h>
 #include <activemq/commands/ActiveMQTempDestination.h>
 #include <activemq/commands/Response.h>
 #include <activemq/commands/SessionInfo.h>
@@ -63,7 +65,7 @@ namespace core{
     private:
 
         typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
-                                     ActiveMQConsumer*,
+                                     Pointer<activemq::core::kernels::ActiveMQConsumerKernel>,
                                      commands::ConsumerId::COMPARATOR> ConsumersMap;
 
         friend class ActiveMQSessionExecutor;
@@ -135,10 +137,10 @@ namespace core{
 
     public:
 
-        ActiveMQSession( ActiveMQConnection* connection,
-                         const Pointer<commands::SessionId>& id,
-                         cms::Session::AcknowledgeMode ackMode,
-                         const decaf::util::Properties& properties );
+        ActiveMQSession(ActiveMQConnection* connection,
+                        const Pointer<commands::SessionId>& id,
+                        cms::Session::AcknowledgeMode ackMode,
+                        const decaf::util::Properties& properties);
 
         virtual ~ActiveMQSession() throw();
 
@@ -146,7 +148,7 @@ namespace core{
          * Redispatches the given set of unconsumed messages to the consumers.
          * @param unconsumedMessages - unconsumed messages to be redelivered.
          */
-        virtual void redispatch( MessageDispatchChannel& unconsumedMessages );
+        virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
 
         /**
          * Stops asynchronous message delivery.
@@ -183,7 +185,7 @@ namespace core{
         /**
          * Fires the given exception to the exception listener of the connection
          */
-        void fire( const exceptions::ActiveMQException& ex );
+        void fire(const exceptions::ActiveMQException& ex);
 
     public:  // Methods from ActiveMQMessageDispatcher
 
@@ -191,7 +193,7 @@ namespace core{
          * Dispatches a message to a particular consumer.
          * @param message - the message to be dispatched
          */
-        virtual void dispatch( const Pointer<MessageDispatch>& message );
+        virtual void dispatch(const Pointer<MessageDispatch>& message);
 
     public:   // Implements Methods
 
@@ -203,29 +205,29 @@ namespace core{
 
         virtual void recover();
 
-        virtual cms::MessageConsumer* createConsumer( const cms::Destination* destination );
+        virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination);
 
-        virtual cms::MessageConsumer* createConsumer( const cms::Destination* destination,
-                                                      const std::string& selector );
+        virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
+                                                     const std::string& selector);
 
-        virtual cms::MessageConsumer* createConsumer( const cms::Destination* destination,
-                                                      const std::string& selector,
-                                                      bool noLocal );
+        virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
+                                                     const std::string& selector,
+                                                     bool noLocal);
 
-        virtual cms::MessageConsumer* createDurableConsumer( const cms::Topic* destination,
-                                                             const std::string& name,
-                                                             const std::string& selector,
-                                                             bool noLocal = false );
+        virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination,
+                                                            const std::string& name,
+                                                            const std::string& selector,
+                                                            bool noLocal = false);
 
-        virtual cms::MessageProducer* createProducer( const cms::Destination* destination );
+        virtual cms::MessageProducer* createProducer(const cms::Destination* destination);
 
-        virtual cms::QueueBrowser* createBrowser( const cms::Queue* queue );
+        virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue);
 
-        virtual cms::QueueBrowser* createBrowser( const cms::Queue* queue, const std::string& selector );
+        virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
 
-        virtual cms::Queue* createQueue( const std::string& queueName );
+        virtual cms::Queue* createQueue(const std::string& queueName);
 
-        virtual cms::Topic* createTopic( const std::string& topicName );
+        virtual cms::Topic* createTopic(const std::string& topicName);
 
         virtual cms::TemporaryQueue* createTemporaryQueue();
 
@@ -235,7 +237,7 @@ namespace core{
 
         virtual cms::BytesMessage* createBytesMessage();
 
-        virtual cms::BytesMessage* createBytesMessage( const unsigned char* bytes, int bytesSize );
+        virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
 
         virtual cms::StreamMessage* createStreamMessage();
 
@@ -249,7 +251,7 @@ namespace core{
 
         virtual bool isTransacted() const;
 
-        virtual void unsubscribe( const std::string& name );
+        virtual void unsubscribe(const std::string& name);
 
    public:   // ActiveMQSession specific Methods
 
@@ -270,7 +272,7 @@ namespace core{
          *
          * @throws CMSException
          */
-        void send( cms::Message* message, ActiveMQProducer* producer, util::Usage* usage );
+        void send(cms::Message* message, kernels::ActiveMQProducerKernel* producer, util::Usage* usage);
 
         /**
          * This method gets any registered exception listener of this sessions
@@ -328,7 +330,7 @@ namespace core{
          * @param value
          *      The new value to assign to the Last Delivered Sequence Id property.
          */
-        void setLastDeliveredSequenceId( long long value ) {
+        void setLastDeliveredSequenceId(long long value) {
             this->lastDeliveredSequenceId = value;
         }
 
@@ -341,7 +343,7 @@ namespace core{
          * @throws ActiveMQException if not currently connected, or if the
          *         operation fails for any reason.
          */
-        void oneway( Pointer<commands::Command> command );
+        void oneway(Pointer<commands::Command> command);
 
         /**
          * Sends a synchronous request and returns the response from the broker.
@@ -357,11 +359,11 @@ namespace core{
          * @throws ActiveMQException thrown if an error response was received
          *         from the broker, or if any other error occurred.
          */
-        Pointer<commands::Response> syncRequest( Pointer<commands::Command> command, unsigned int timeout = 0 );
+        Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
 
         /**
-         * Adds a MessageConsumer to this session registering it with the Connection and store
-         * a reference to it so the session can ensure that all resources are closed when
+         * Adds a MessageConsumerKernel to this session registering it with the Connection and
+         * store a reference to it so the session can ensure that all resources are closed when
          * the session is closed.
          *
          * @param consumer
@@ -369,7 +371,7 @@ namespace core{
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void addConsumer( ActiveMQConsumer* consumer );
+        void addConsumer(Pointer<activemq::core::kernels::ActiveMQConsumerKernel> consumer);
 
         /**
          * Dispose of a MessageConsumer from this session.  Removes it from the Connection
@@ -380,30 +382,30 @@ namespace core{
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void removeConsumer( const Pointer<commands::ConsumerId>& consumerId );
+        void removeConsumer(const Pointer<commands::ConsumerId>& consumerId);
 
         /**
          * Adds a MessageProducer to this session registering it with the Connection and store
          * a reference to it so the session can ensure that all resources are closed when
          * the session is closed.
          *
-         * @param consumer
-         *      The ActiveMQProducer instance to add to this session.
+         * @param producer
+         *      The ActiveMQProducerKernel instance to add to this session.
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void addProducer( ActiveMQProducer* producer );
+        void addProducer(Pointer<activemq::core::kernels::ActiveMQProducerKernel> producer);
 
         /**
          * Dispose of a MessageProducer from this session.  Removes it from the Connection
          * and clean up any resources associated with it.
          *
          * @param producerId
-         *      The ProducerId of the MessageProducer to remove from this session.
+         *      The ProducerId of the producer to remove to this session.
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void removeProducer( ActiveMQProducer* producer );
+        void removeProducer(const Pointer<commands::ProducerId>& producerId);
 
         /**
          * Starts if not already start a Transaction for this Session.  If the session
@@ -490,12 +492,12 @@ namespace core{
        // Send the Destination Creation Request to the Broker, alerting it
        // that we've created a new Temporary Destination.
        // @param tempDestination - The new Temporary Destination
-       void createTemporaryDestination( commands::ActiveMQTempDestination* tempDestination );
+       void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
 
        // Send the Destination Destruction Request to the Broker, alerting
        // it that we've removed an existing Temporary Destination.
        // @param tempDestination - The Temporary Destination to remove
-       void destroyTemporaryDestination( commands::ActiveMQTempDestination* tempDestination );
+       void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
 
        // Creates a new Temporary Destination name using the connection id
        // and a rolling count.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Mon Mar 26 21:11:12 2012
@@ -18,7 +18,7 @@
 #include "ActiveMQSessionExecutor.h"
 
 #include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/FifoMessageDispatchChannel.h>
 #include <activemq/core/SimplePriorityMessageDispatchChannel.h>
@@ -28,6 +28,7 @@
 using namespace std;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::kernels;
 using namespace activemq::threads;
 using namespace activemq::exceptions;
 using namespace decaf::lang;
@@ -35,13 +36,13 @@ using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) :
-            session( session ), messageQueue(), taskRunner() {
+ActiveMQSessionExecutor::ActiveMQSessionExecutor(ActiveMQSession* session) :
+    session(session), messageQueue(), taskRunner() {
 
-    if( this->session->getConnection()->isMessagePrioritySupported() ) {
-        this->messageQueue.reset( new SimplePriorityMessageDispatchChannel() );
+    if (this->session->getConnection()->isMessagePrioritySupported()) {
+        this->messageQueue.reset(new SimplePriorityMessageDispatchChannel());
     } else {
-        this->messageQueue.reset( new FifoMessageDispatchChannel() );
+        this->messageQueue.reset(new FifoMessageDispatchChannel());
     }
 }
 
@@ -66,7 +67,7 @@ ActiveMQSessionExecutor::~ActiveMQSessio
 void ActiveMQSessionExecutor::execute( const Pointer<MessageDispatch>& dispatch ) {
 
     // Add the data to the queue.
-    this->messageQueue->enqueue( dispatch );
+    this->messageQueue->enqueue(dispatch);
     this->wakeup();
 }
 
@@ -74,7 +75,7 @@ void ActiveMQSessionExecutor::execute( c
 void ActiveMQSessionExecutor::executeFirst( const Pointer<MessageDispatch>& dispatch ) {
 
     // Add the data to the queue.
-    this->messageQueue->enqueueFirst( dispatch );
+    this->messageQueue->enqueueFirst(dispatch);
     this->wakeup();
 }
 
@@ -82,9 +83,9 @@ void ActiveMQSessionExecutor::executeFir
 void ActiveMQSessionExecutor::wakeup() {
 
     Pointer<TaskRunner> taskRunner = this->taskRunner;
-    synchronized( messageQueue.get() ) {
-        if( this->taskRunner == NULL ) {
-            this->taskRunner.reset( new DedicatedTaskRunner( this ) );
+    synchronized(messageQueue.get()) {
+        if (this->taskRunner == NULL) {
+            this->taskRunner.reset(new DedicatedTaskRunner(this));
         }
 
         taskRunner = this->taskRunner;
@@ -96,10 +97,9 @@ void ActiveMQSessionExecutor::wakeup() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::start() {
 
-    if( !messageQueue->isRunning() ) {
-
+    if (!messageQueue->isRunning()) {
         messageQueue->start();
-        if( hasUncomsumedMessages() ) {
+        if (hasUncomsumedMessages()) {
             this->wakeup();
         }
     }
@@ -108,11 +108,11 @@ void ActiveMQSessionExecutor::start() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::stop() {
 
-    if( messageQueue->isRunning() ) {
+    if (messageQueue->isRunning()) {
         messageQueue->stop();
         Pointer<TaskRunner> taskRunner = this->taskRunner;
-        if( taskRunner != NULL ) {
-            this->taskRunner.reset( NULL );
+        if (taskRunner != NULL) {
+            this->taskRunner.reset(NULL);
             taskRunner->shutdown();
         }
     }
@@ -123,28 +123,28 @@ void ActiveMQSessionExecutor::dispatch( 
 
     try {
 
-        ActiveMQConsumer* consumer = NULL;
+        Pointer<ActiveMQConsumerKernel> consumer;
 
-        synchronized( &( this->session->consumers ) ) {
-            if( this->session->consumers.containsKey( dispatch->getConsumerId() ) ) {
-                consumer = this->session->consumers.get( dispatch->getConsumerId() );
+        synchronized(&( this->session->consumers)) {
+            if (this->session->consumers.containsKey(dispatch->getConsumerId())) {
+                consumer = this->session->consumers.get(dispatch->getConsumerId());
             }
         }
 
         // If the consumer is not available, just ignore the message.
         // Otherwise, dispatch the message to the consumer.
-        if( consumer != NULL ) {
-            consumer->dispatch( dispatch );
+        if (consumer != NULL) {
+            consumer->dispatch(dispatch);
         }
 
-    } catch( decaf::lang::Exception& ex ) {
-        ex.setMark(__FILE__, __LINE__ );
+    } catch (decaf::lang::Exception& ex) {
+        ex.setMark(__FILE__, __LINE__);
         ex.printStackTrace();
-    } catch( std::exception& ex ) {
-        ActiveMQException amqex( __FILE__, __LINE__, ex.what() );
+    } catch (std::exception& ex) {
+        ActiveMQException amqex(__FILE__, __LINE__, ex.what());
         amqex.printStackTrace();
-    } catch( ... ) {
-        ActiveMQException amqex( __FILE__, __LINE__, "caught unknown exception" );
+    } catch (...) {
+        ActiveMQException amqex(__FILE__, __LINE__, "caught unknown exception");
         amqex.printStackTrace();
     }
 }
@@ -154,14 +154,13 @@ bool ActiveMQSessionExecutor::iterate() 
 
     try {
 
-        synchronized( &( this->session->consumers ) ) {
-
-            std::vector<ActiveMQConsumer*> consumers = this->session->consumers.values();
-            std::vector<ActiveMQConsumer*>::iterator iter = consumers.begin();
+        synchronized(&(this->session->consumers)) {
+            std::vector<Pointer<ActiveMQConsumerKernel> > consumers = this->session->consumers.values();
+            std::vector<Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
 
             // Deliver any messages queued on the consumer to their listeners.
-            for( ; iter != consumers.end(); ++iter ) {
-                if( (*iter)->iterate() ) {
+            for (; iter != consumers.end(); ++iter) {
+                if ((*iter)->iterate()) {
                     return true;
                 }
             }
@@ -170,24 +169,24 @@ bool ActiveMQSessionExecutor::iterate() 
         // No messages left queued on the listeners.. so now dispatch messages
         // queued on the session
         Pointer<MessageDispatch> message = messageQueue->dequeueNoWait();
-        if( message != NULL ) {
-            dispatch( message );
+        if (message != NULL) {
+            dispatch(message);
             return !messageQueue->isEmpty();
         }
 
         return false;
 
-    } catch( decaf::lang::Exception& ex ) {
-        ex.setMark(__FILE__, __LINE__ );
-        session->fire( ex );
+    } catch (decaf::lang::Exception& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        session->fire(ex);
         return true;
-    } catch( std::exception& stdex ) {
-        ActiveMQException ex( __FILE__, __LINE__, stdex.what() );
-        session->fire( ex );
+    } catch (std::exception& stdex) {
+        ActiveMQException ex(__FILE__, __LINE__, stdex.what());
+        session->fire(ex);
         return true;
-    } catch( ... ) {
-        ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception" );
-        session->fire( ex );
+    } catch (...) {
+        ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception");
+        session->fire(ex);
         return true;
     }
 }



Mime
View raw message