activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1307147 [1/3] - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: ./ activemq/core/ activemq/core/kernels/ cms/
Date Thu, 29 Mar 2012 22:27:33 GMT
Author: tabish
Date: Thu Mar 29 22:27:32 2012
New Revision: 1307147

URL: http://svn.apache.org/viewvc?rev=1307147&view=rev
Log:
Break out more functionality into kernel classes that allow the connection to control their lifetime.  

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Thu Mar 29 22:27:32 2012
@@ -103,6 +103,8 @@ cc_sources = \
     activemq/core/SimplePriorityMessageDispatchChannel.cpp \
     activemq/core/kernels/ActiveMQConsumerKernel.cpp \
     activemq/core/kernels/ActiveMQProducerKernel.cpp \
+    activemq/core/kernels/ActiveMQSessionKernel.cpp \
+    activemq/core/kernels/ActiveMQXASessionKernel.cpp \
     activemq/core/policies/DefaultPrefetchPolicy.cpp \
     activemq/core/policies/DefaultRedeliveryPolicy.cpp \
     activemq/exceptions/ActiveMQException.cpp \
@@ -600,6 +602,8 @@ h_sources = \
     activemq/core/Synchronization.h \
     activemq/core/kernels/ActiveMQConsumerKernel.h \
     activemq/core/kernels/ActiveMQProducerKernel.h \
+    activemq/core/kernels/ActiveMQSessionKernel.h \
+    activemq/core/kernels/ActiveMQXASessionKernel.h \
     activemq/core/policies/DefaultPrefetchPolicy.h \
     activemq/core/policies/DefaultRedeliveryPolicy.h \
     activemq/exceptions/ActiveMQException.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Mar 29 22:27:32 2012
@@ -22,6 +22,7 @@
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQProducer.h>
 #include <activemq/core/ActiveMQConstants.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <activemq/core/policies/DefaultPrefetchPolicy.h>
 #include <activemq/core/policies/DefaultRedeliveryPolicy.h>
 #include <activemq/exceptions/ActiveMQException.h>
@@ -146,7 +147,7 @@ namespace core{
         DispatcherMap dispatchers;
         ProducerMap activeProducers;
 
-        decaf::util::concurrent::CopyOnWriteArrayList<ActiveMQSession*> activeSessions;
+        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQSessionKernel> > activeSessions;
         decaf::util::concurrent::CopyOnWriteArrayList<transport::TransportListener*> transportListeners;
 
         ConnectionConfig() : properties(),
@@ -255,7 +256,6 @@ void ActiveMQConnection::addDispatcher(
     const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher ) {
 
     try{
-        // Add the consumer to the map.
         synchronized(&this->config->dispatchers) {
             this->config->dispatchers.put(consumer, dispatcher);
         }
@@ -264,10 +264,9 @@ void ActiveMQConnection::addDispatcher(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeDispatcher( const decaf::lang::Pointer<ConsumerId>& consumer ) {
+void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer) {
 
     try{
-        // Remove the consumer from the map.
         synchronized(&this->config->dispatchers) {
             this->config->dispatchers.remove(consumer);
         }
@@ -291,11 +290,15 @@ cms::Session* ActiveMQConnection::create
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        // Create the session instance.
-        ActiveMQSession* session = new ActiveMQSession(
-            this, getNextSessionId(), ackMode, *this->config->properties);
+        // Create the session instance as a Session Kernel we then create and return a
+        // ActiveMQSession instance that acts as a proxy to the kernel caller can delete
+        // that at any time since we only refer to the Pointer to the session kernel.
+        Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(
+            this, getNextSessionId(), ackMode, *this->config->properties));
 
-        return session;
+        this->addSession(session);
+
+        return new ActiveMQSession(session);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -311,21 +314,16 @@ Pointer<SessionId> ActiveMQConnection::g
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addSession( ActiveMQSession* session ) {
-
+void ActiveMQConnection::addSession(Pointer<ActiveMQSessionKernel> session) {
     try {
-
-        // Remove this session from the set of active sessions.
         this->config->activeSessions.add(session);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeSession(ActiveMQSession* session) {
-
+void ActiveMQConnection::removeSession(Pointer<ActiveMQSessionKernel> session) {
     try {
-        // Remove this session from the set of active sessions.
         this->config->activeSessions.remove(session);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -335,7 +333,6 @@ void ActiveMQConnection::removeSession(A
 void ActiveMQConnection::addProducer(Pointer<ActiveMQProducerKernel> producer) {
 
     try {
-        // Add this producer from the set of active consumer.
         synchronized(&this->config->activeProducers) {
             this->config->activeProducers.put(producer->getProducerInfo()->getProducerId(), producer);
         }
@@ -347,7 +344,6 @@ void ActiveMQConnection::addProducer(Poi
 void ActiveMQConnection::removeProducer(const decaf::lang::Pointer<ProducerId>& producerId) {
 
     try {
-        // Remove this producer from the set of active consumer.
         synchronized(&this->config->activeProducers) {
             this->config->activeProducers.remove(producerId);
         }
@@ -421,13 +417,13 @@ void ActiveMQConnection::close() {
         }
 
         // Get the complete list of active sessions.
-        std::auto_ptr< Iterator<ActiveMQSession*> > iter(this->config->activeSessions.iterator());
+        std::auto_ptr< Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
 
         long long lastDeliveredSequenceId = 0;
 
         // Dispose of all the Session resources we know are still open.
         while (iter->hasNext()) {
-            ActiveMQSession* session = iter->next();
+            Pointer<ActiveMQSessionKernel> session = iter->next();
             try{
                 session->dispose();
                 lastDeliveredSequenceId =
@@ -454,11 +450,11 @@ void ActiveMQConnection::cleanup() {
     try{
 
         // Get the complete list of active sessions.
-        std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->config->activeSessions.iterator() );
+        std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
 
         // Dispose of all the Session resources we know are still open.
-        while( iter->hasNext() ) {
-            ActiveMQSession* session = iter->next();
+        while (iter->hasNext()) {
+            Pointer<ActiveMQSessionKernel> session = iter->next();
             try{
                 session->dispose();
             } catch( cms::CMSException& ex ){
@@ -466,20 +462,20 @@ void ActiveMQConnection::cleanup() {
             }
         }
 
-        if( this->config->isConnectionInfoSentToBroker ) {
-            if( !transportFailed.get() && !closing.get() ) {
-                this->syncRequest( this->config->connectionInfo->createRemoveCommand() );
+        if (this->config->isConnectionInfoSentToBroker) {
+            if (!transportFailed.get() && !closing.get()) {
+                this->syncRequest(this->config->connectionInfo->createRemoveCommand());
             }
             this->config->isConnectionInfoSentToBroker = false;
         }
 
-        if( this->config->userSpecifiedClientID ) {
+        if (this->config->userSpecifiedClientID) {
             this->config->connectionInfo->setClientId("");
             this->config->userSpecifiedClientID = false;
         }
 
         this->config->clientIDSet = false;
-        this->started.set( false );
+        this->started.set(false);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -495,11 +491,11 @@ void ActiveMQConnection::start() {
         // This starts or restarts the delivery of all incoming messages
         // messages delivered while this connection is stopped are dropped
         // and not acknowledged.
-        if( this->started.compareAndSet( false, true ) ) {
+        if (this->started.compareAndSet(false, true)) {
 
             // Start all the sessions.
-            std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->config->activeSessions.iterator() );
-            while( iter->hasNext() ) {
+            std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+            while (iter->hasNext()) {
                 iter->next()->start();
             }
         }
@@ -516,11 +512,11 @@ void ActiveMQConnection::stop() {
 
         // Once current deliveries are done this stops the delivery of any
         // new messages.
-        if( this->started.compareAndSet( true, false ) ) {
-            synchronized( &this->config->activeSessions ) {
-                std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->config->activeSessions.iterator() );
+        if (this->started.compareAndSet(true, false)) {
+            synchronized(&this->config->activeSessions) {
+                std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
 
-                while( iter->hasNext() ){
+                while (iter->hasNext()) {
                     iter->next()->stop();
                 }
             }
@@ -530,12 +526,12 @@ void ActiveMQConnection::stop() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::disconnect( long long lastDeliveredSequenceId ) {
+void ActiveMQConnection::disconnect(long long lastDeliveredSequenceId) {
 
     try{
 
         // Clear the listener, we don't care about async errors at this point.
-        this->config->transport->setTransportListener( NULL );
+        this->config->transport->setTransportListener(NULL);
 
         // Allow the Support class to shutdown its resources, including the Transport.
         bool hasException = false;
@@ -548,7 +544,7 @@ void ActiveMQConnection::disconnect( lon
                 Pointer<RemoveInfo> command(this->config->connectionInfo->createRemoveCommand());
                 command->setLastDeliveredSequenceId(lastDeliveredSequenceId);
                 this->syncRequest(command, this->config->closeTimeout);
-            } catch(exceptions::ActiveMQException& ex) {
+            } catch (exceptions::ActiveMQException& ex) {
                 if (!hasException) {
                     hasException = true;
                     ex.setMark(__FILE__, __LINE__);
@@ -560,7 +556,7 @@ void ActiveMQConnection::disconnect( lon
                 // Send the disconnect command to the broker.
                 Pointer<ShutdownInfo> shutdown(new ShutdownInfo());
                 oneway(shutdown);
-            } catch(exceptions::ActiveMQException& ex) {
+            } catch (exceptions::ActiveMQException& ex) {
                 if (!hasException) {
                     hasException = true;
                     ex.setMark(__FILE__, __LINE__);
@@ -573,7 +569,7 @@ void ActiveMQConnection::disconnect( lon
 
             try {
                 this->config->transport->close();
-            } catch(exceptions::ActiveMQException& ex) {
+            } catch (exceptions::ActiveMQException& ex) {
                 if (!hasException) {
                     hasException = true;
                     ex.setMark(__FILE__, __LINE__);
@@ -583,7 +579,7 @@ void ActiveMQConnection::disconnect( lon
 
             try {
                 this->config->transport.reset(NULL);
-            } catch(exceptions::ActiveMQException& ex) {
+            } catch (exceptions::ActiveMQException& ex) {
                 if (!hasException) {
                     hasException = true;
                     ex.setMark(__FILE__, __LINE__);
@@ -608,15 +604,15 @@ void ActiveMQConnection::sendPullRequest
 
     try {
 
-         if( consumer->getPrefetchSize() == 0 ) {
+         if (consumer->getPrefetchSize() == 0) {
 
-             Pointer<MessagePull> messagePull( new MessagePull() );
-             messagePull->setConsumerId( consumer->getConsumerId() );
-             messagePull->setDestination( consumer->getDestination() );
-             messagePull->setTimeout( timeout );
+            Pointer<MessagePull> messagePull(new MessagePull());
+            messagePull->setConsumerId(consumer->getConsumerId());
+            messagePull->setDestination(consumer->getDestination());
+            messagePull->setTimeout(timeout);
 
-             this->oneway( messagePull );
-         }
+            this->oneway(messagePull);
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -624,26 +620,25 @@ void ActiveMQConnection::sendPullRequest
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::destroyDestination( const ActiveMQDestination* destination ) {
+void ActiveMQConnection::destroyDestination(const ActiveMQDestination* destination) {
 
     try{
 
-        if( destination == NULL ) {
-            throw NullPointerException(
-                __FILE__, __LINE__, "Destination passed was NULL" );
+        if (destination == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
         }
 
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        Pointer<DestinationInfo> command( new DestinationInfo() );
+        Pointer<DestinationInfo> command(new DestinationInfo());
 
-        command->setConnectionId( this->config->connectionInfo->getConnectionId() );
-        command->setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
-        command->setDestination( Pointer<ActiveMQDestination>( destination->cloneDataStructure() ) );
+        command->setConnectionId(this->config->connectionInfo->getConnectionId());
+        command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
+        command->setDestination(Pointer<ActiveMQDestination> (destination->cloneDataStructure()));
 
         // Send the message to the broker.
-        syncRequest( command );
+        syncRequest(command);
     }
     AMQ_CATCH_RETHROW( NullPointerException )
     AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
@@ -653,22 +648,21 @@ void ActiveMQConnection::destroyDestinat
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::destroyDestination( const cms::Destination* destination ) {
+void ActiveMQConnection::destroyDestination(const cms::Destination* destination) {
 
     try{
 
-        if( destination == NULL ) {
-            throw NullPointerException(
-                __FILE__, __LINE__, "Destination passed was NULL" );
+        if (destination == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
         }
 
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
         const ActiveMQDestination* amqDestination =
-            dynamic_cast<const ActiveMQDestination*>( destination );
+            dynamic_cast<const ActiveMQDestination*> (destination);
 
-        this->destroyDestination( amqDestination );
+        this->destroyDestination(amqDestination);
     }
     AMQ_CATCH_RETHROW( NullPointerException )
     AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
@@ -682,7 +676,7 @@ void ActiveMQConnection::onCommand( cons
 
     try{
 
-        if( command->isMessageDispatch() ) {
+        if (command->isMessageDispatch()) {
 
             Pointer<MessageDispatch> dispatch = command.dynamicCast<MessageDispatch>();
 
@@ -691,66 +685,63 @@ void ActiveMQConnection::onCommand( cons
 
             // Look up the dispatcher.
             Dispatcher* dispatcher = NULL;
-            synchronized( &this->config->dispatchers ) {
+            synchronized(&this->config->dispatchers) {
 
-                dispatcher = this->config->dispatchers.get( dispatch->getConsumerId() );
+                dispatcher = this->config->dispatchers.get(dispatch->getConsumerId());
 
                 // If we have no registered dispatcher, the consumer was probably
                 // just closed.
-                if( dispatcher != NULL ) {
+                if (dispatcher != NULL) {
 
                     Pointer<commands::Message> message = dispatch->getMessage();
 
                     // Message == NULL to signal the end of a Queue Browse.
-                    if( message != NULL ) {
-                        message->setReadOnlyBody( true );
-                        message->setReadOnlyProperties( true );
-                        message->setRedeliveryCounter( dispatch->getRedeliveryCounter() );
+                    if (message != NULL) {
+                        message->setReadOnlyBody(true);
+                        message->setReadOnlyProperties(true);
+                        message->setRedeliveryCounter(dispatch->getRedeliveryCounter());
                     }
 
-                    dispatcher->dispatch( dispatch );
+                    dispatcher->dispatch(dispatch);
                 }
             }
 
-        } else if( command->isProducerAck() ) {
+        } else if (command->isProducerAck()) {
 
             ProducerAck* producerAck = dynamic_cast<ProducerAck*>( command.get() );
 
             // Get the consumer info object for this consumer.
             Pointer<ActiveMQProducerKernel> producer;
-            synchronized( &this->config->activeProducers ) {
+            synchronized(&this->config->activeProducers) {
                 producer = this->config->activeProducers.get(producerAck->getProducerId());
                 if (producer != NULL) {
                     producer->onProducerAck(*producerAck);
                 }
             }
 
-        } else if( command->isWireFormatInfo() ) {
-            this->config->brokerWireFormatInfo =
-                command.dynamicCast<WireFormatInfo>();
-        } else if( command->isBrokerInfo() ) {
-            this->config->brokerInfo =
-                command.dynamicCast<BrokerInfo>();
+        } else if (command->isWireFormatInfo()) {
+            this->config->brokerWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+        } else if (command->isBrokerInfo()) {
+            this->config->brokerInfo = command.dynamicCast<BrokerInfo>();
             this->config->brokerInfoReceived->countDown();
-        } else if( command->isShutdownInfo() ) {
+        } else if (command->isShutdownInfo()) {
 
             try {
-                if( !this->isClosed() ) {
-                    fire( ActiveMQException(
-                        __FILE__, __LINE__,
-                        "ActiveMQConnection::onCommand - "
-                        "Broker closed this connection."));
+                if (!this->isClosed()) {
+                    fire(ActiveMQException(__FILE__, __LINE__,
+                             "ActiveMQConnection::onCommand - "
+                             "Broker closed this connection."));
                 }
             } catch( ... ) { /* do nothing */ }
 
         } else {
         }
 
-        Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );
+        Pointer< Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
 
-        while( iter->hasNext() ) {
+        while (iter->hasNext()) {
             try{
-                iter->next()->onCommand( command );
+                iter->next()->onCommand(command);
             } catch(...) {}
         }
     }
@@ -824,15 +815,15 @@ void ActiveMQConnection::transportInterr
     this->config->transportInterruptionProcessingComplete.reset(
         new CountDownLatch( (int)this->config->dispatchers.size() ) );
 
-    std::auto_ptr< Iterator<ActiveMQSession*> > sessions( this->config->activeSessions.iterator() );
+    std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
 
-    while( sessions->hasNext() ) {
+    while (sessions->hasNext()) {
         sessions->next()->clearMessagesInProgress();
     }
 
-    Pointer< Iterator<TransportListener*> > listeners( this->config->transportListeners.iterator() );
+    Pointer< Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
 
-    while( listeners->hasNext() ) {
+    while (listeners->hasNext()) {
         try{
             listeners->next()->transportInterrupted();
         } catch(...) {}
@@ -856,7 +847,7 @@ void ActiveMQConnection::oneway( Pointer
 
     try {
         checkClosedOrFailed();
-        this->config->transport->oneway( command );
+        this->config->transport->oneway(command);
     }
     AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException )
@@ -873,19 +864,19 @@ Pointer<Response> ActiveMQConnection::sy
 
         Pointer<Response> response;
 
-        if( timeout == 0 ) {
-            response = this->config->transport->request( command );
+        if (timeout == 0) {
+            response = this->config->transport->request(command);
         } else {
-            response = this->config->transport->request( command, timeout );
+            response = this->config->transport->request(command, timeout);
         }
 
         commands::ExceptionResponse* exceptionResponse =
-            dynamic_cast<ExceptionResponse*>( response.get() );
+            dynamic_cast<ExceptionResponse*> (response.get());
 
-        if( exceptionResponse != NULL ) {
+        if (exceptionResponse != NULL) {
 
             // Create an exception to hold the error information.
-            BrokerException exception( __FILE__, __LINE__, exceptionResponse->getException().get() );
+            BrokerException exception(__FILE__, __LINE__, exceptionResponse->getException().get());
 
             // Throw the exception.
             throw exception;
@@ -902,7 +893,7 @@ Pointer<Response> ActiveMQConnection::sy
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::checkClosed() const {
-    if( this->isClosed() ) {
+    if (this->isClosed()) {
         throw ActiveMQException(
             __FILE__, __LINE__,
             "ActiveMQConnection::enforceConnected - Connection has already been closed!" );
@@ -913,7 +904,7 @@ void ActiveMQConnection::checkClosed() c
 void ActiveMQConnection::checkClosedOrFailed() const {
 
     checkClosed();
-    if( this->transportFailed.get() == true ) {
+    if (this->transportFailed.get() == true) {
         throw ConnectionFailedException( *this->config->firstFailureError );
     }
 }
@@ -924,24 +915,24 @@ void ActiveMQConnection::ensureConnectio
     try{
 
         // Can we skip sending the ConnectionInfo packet, cheap test
-        if( this->config->isConnectionInfoSentToBroker || closed.get() ) {
+        if (this->config->isConnectionInfoSentToBroker || closed.get()) {
             return;
         }
 
-        synchronized( &( this->config->ensureConnectionInfoSentMutex ) ) {
+        synchronized(&( this->config->ensureConnectionInfoSentMutex)) {
 
             // Can we skip sending the ConnectionInfo packet??
-            if( this->config->isConnectionInfoSentToBroker || closed.get() ) {
+            if (this->config->isConnectionInfoSentToBroker || closed.get()) {
                 return;
             }
 
             // check for a user specified Id
-            if( !this->config->userSpecifiedClientID ) {
-                this->config->connectionInfo->setClientId( this->config->clientIdGenerator->generateId() );
+            if (!this->config->userSpecifiedClientID) {
+                this->config->connectionInfo->setClientId(this->config->clientIdGenerator->generateId());
             }
 
             // Now we ping the broker and see if we get an ack / nack
-            syncRequest( this->config->connectionInfo );
+            syncRequest(this->config->connectionInfo);
 
             this->config->isConnectionInfoSentToBroker = true;
         }
@@ -952,12 +943,12 @@ void ActiveMQConnection::ensureConnectio
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::fire( const ActiveMQException& ex ) {
-    if( this->config->exceptionListener != NULL ) {
+void ActiveMQConnection::fire(const ActiveMQException& ex) {
+    if (this->config->exceptionListener != NULL) {
         try {
-            this->config->exceptionListener->onException( ex.convertToCMSException() );
+            this->config->exceptionListener->onException(ex.convertToCMSException());
+        } catch (...) {
         }
-        catch(...){}
     }
 }
 
@@ -974,35 +965,35 @@ const ConnectionId& ActiveMQConnection::
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addTransportListener( TransportListener* transportListener ) {
+void ActiveMQConnection::addTransportListener(TransportListener* transportListener) {
 
-    if( transportListener == NULL ) {
+    if (transportListener == NULL) {
         return;
     }
 
     // Add this listener from the set of active TransportListeners
-    this->config->transportListeners.add( transportListener );
+    this->config->transportListeners.add(transportListener);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeTransportListener( TransportListener* transportListener ) {
+void ActiveMQConnection::removeTransportListener(TransportListener* transportListener) {
 
-    if( transportListener == NULL ) {
+    if (transportListener == NULL) {
         return;
     }
 
     // Remove this listener from the set of active TransportListeners
-    this->config->transportListeners.remove( transportListener );
+    this->config->transportListeners.remove(transportListener);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete() {
 
     Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
-    if( cdl != NULL ) {
+    if (cdl != NULL) {
 
-        while( !closed.get() && !transportFailed.get() && cdl->getCount() > 0 ) {
-            cdl->await( 10, TimeUnit::SECONDS );
+        while (!closed.get() && !transportFailed.get() && cdl->getCount() > 0) {
+            cdl->await(10, TimeUnit::SECONDS);
         }
 
         signalInterruptionProcessingComplete();
@@ -1013,12 +1004,13 @@ void ActiveMQConnection::waitForTranspor
 void ActiveMQConnection::setTransportInterruptionProcessingComplete() {
 
     Pointer<CountDownLatch> cdl = this->config->transportInterruptionProcessingComplete;
-    if( cdl != NULL ) {
+    if (cdl != NULL) {
         cdl->countDown();
 
         try {
             signalInterruptionProcessingComplete();
-        } catch( InterruptedException& ignored ) {}
+        } catch (InterruptedException& ignored) {
+        }
     }
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Thu Mar 29 22:27:32 2012
@@ -31,6 +31,7 @@
 #include <activemq/transport/TransportListener.h>
 #include <activemq/threads/Scheduler.h>
 #include <activemq/core/kernels/ActiveMQProducerKernel.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/CopyOnWriteArrayList.h>
@@ -119,7 +120,7 @@ namespace core{
          *
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void addSession(ActiveMQSession* session);
+        virtual void addSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session);
 
         /**
          * Removes the session resources for the given session instance.
@@ -129,7 +130,7 @@ namespace core{
          *
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void removeSession(ActiveMQSession* session);
+        virtual void removeSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session);
 
         /**
          * Adds an active Producer to the Set of known producers.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp Thu Mar 29 22:27:32 2012
@@ -57,7 +57,7 @@ namespace core{
 
     public:
 
-        Browser(ActiveMQQueueBrowser* parent, ActiveMQSession* session,
+        Browser(ActiveMQQueueBrowser* parent, ActiveMQSessionKernel* session,
                 const Pointer<commands::ConsumerId>& id,
                 const Pointer<commands::ActiveMQDestination>& destination,
                 const std::string& name, const std::string& selector,
@@ -90,7 +90,7 @@ namespace core{
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSession* session,
+ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSessionKernel* session,
                                            const Pointer<commands::ConsumerId>& consumerId,
                                            const Pointer<commands::ActiveMQDestination>& destination,
                                            const std::string& selector,
@@ -258,8 +258,8 @@ Pointer<ActiveMQConsumerKernel> ActiveMQ
     int prefetch = this->session->getConnection()->getPrefetchPolicy()->getQueueBrowserPrefetch();
 
     Pointer<ActiveMQConsumerKernel> consumer(
-        new Browser( this, session, consumerId, destination, "", selector,
-                     prefetch, 0, false, true, dispatchAsync, NULL ) );
+        new Browser(this, session, consumerId, destination, "", selector,
+                    prefetch, 0, false, true, dispatchAsync, NULL ) );
 
     try {
         this->session->addConsumer(consumer);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h Thu Mar 29 22:27:32 2012
@@ -25,6 +25,7 @@
 #include <cms/MessageEnumeration.h>
 #include <activemq/commands/ConsumerId.h>
 #include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
@@ -48,7 +49,7 @@ namespace kernels {
 
         friend class Browser;
 
-        ActiveMQSession* session;
+        activemq::core::kernels::ActiveMQSessionKernel* session;
         Pointer<commands::ConsumerId> consumerId;
         Pointer<commands::ActiveMQDestination> destination;
         std::string selector;
@@ -69,7 +70,7 @@ namespace kernels {
 
     public:
 
-        ActiveMQQueueBrowser(ActiveMQSession* session,
+        ActiveMQQueueBrowser(activemq::core::kernels::ActiveMQSessionKernel* session,
                              const Pointer<commands::ConsumerId>& consumerId,
                              const Pointer<commands::ActiveMQDestination>& destination,
                              const std::string& selector,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Mar 29 22:27:32 2012
@@ -19,42 +19,13 @@
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/ActiveMQConsumer.h>
 #include <activemq/core/ActiveMQProducer.h>
 #include <activemq/core/ActiveMQQueueBrowser.h>
-#include <activemq/core/ActiveMQSessionExecutor.h>
-#include <activemq/core/PrefetchPolicy.h>
-#include <activemq/util/ActiveMQProperties.h>
 #include <activemq/util/CMSExceptionSupport.h>
 
-#include <activemq/commands/ConsumerInfo.h>
-#include <activemq/commands/DestinationInfo.h>
-#include <activemq/commands/ExceptionResponse.h>
-#include <activemq/commands/ActiveMQDestination.h>
-#include <activemq/commands/ActiveMQTopic.h>
-#include <activemq/commands/ActiveMQQueue.h>
 #include <activemq/commands/ActiveMQTempDestination.h>
-#include <activemq/commands/ActiveMQMessage.h>
-#include <activemq/commands/ActiveMQBytesMessage.h>
-#include <activemq/commands/ActiveMQTextMessage.h>
-#include <activemq/commands/ActiveMQMapMessage.h>
-#include <activemq/commands/ActiveMQStreamMessage.h>
-#include <activemq/commands/ActiveMQTempTopic.h>
-#include <activemq/commands/ActiveMQTempQueue.h>
-#include <activemq/commands/MessagePull.h>
-#include <activemq/commands/RemoveInfo.h>
-#include <activemq/commands/ProducerInfo.h>
-#include <activemq/commands/TransactionInfo.h>
-#include <activemq/commands/RemoveSubscriptionInfo.h>
-
-#include <decaf/lang/Boolean.h>
-#include <decaf/lang/Integer.h>
-#include <decaf/lang/Runnable.h>
-#include <decaf/lang/Long.h>
-#include <decaf/lang/Math.h>
-#include <decaf/util/Queue.h>
-#include <decaf/lang/exceptions/InvalidStateException.h>
+
 #include <decaf/lang/exceptions/NullPointerException.h>
 
 using namespace std;
@@ -64,1144 +35,225 @@ using namespace activemq::core;
 using namespace activemq::core::kernels;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
-using namespace activemq::threads;
-using namespace decaf::util;
-using namespace decaf::util::concurrent;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace {
-
-    /**
-     * Class used to clear a Consumer's dispatch queue asynchronously from the
-     * connection class's Scheduler instance.
-     */
-    class ClearConsumerTask : public Runnable {
-    private:
-
-        Pointer<ActiveMQConsumerKernel> consumer;
-
-    private:
-
-        ClearConsumerTask(const ClearConsumerTask&);
-        ClearConsumerTask& operator=(const ClearConsumerTask&);
-
-    public:
-
-        ClearConsumerTask(Pointer<ActiveMQConsumerKernel> consumer) : Runnable(), consumer(consumer) {
-
-            if (consumer == NULL) {
-                throw NullPointerException(
-                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
-            }
-        }
-
-        virtual ~ClearConsumerTask() {}
-
-        virtual void run() {
-            this->consumer->clearMessagesInProgress();
-        }
-    };
-
-    /**
-     * Class used to Hook a session that has been closed into the Transaction
-     * it is currently a part of.  Once the Transaction has been Committed or
-     * Rolled back this Synchronization can finish the Close of the session.
-     */
-    class CloseSynhcronization : public Synchronization {
-    private:
-
-        ActiveMQSession* session;
-
-    private:
-
-        CloseSynhcronization(const CloseSynhcronization&);
-        CloseSynhcronization& operator=(const CloseSynhcronization&);
-
-    public:
-
-        CloseSynhcronization(ActiveMQSession* session) : Synchronization(), session(session) {
-
-            if(session == NULL) {
-                throw NullPointerException(
-                    __FILE__, __LINE__, "Synchronization Created with NULL Session.");
-            }
-        }
-
-        virtual ~CloseSynhcronization() {}
-
-        virtual void beforeEnd() {
-        }
-
-        virtual void afterCommit() {
-            session->doClose();
-        }
-
-        virtual void afterRollback() {
-            session->doClose();
-        }
-    };
-}
-
-////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace core{
-
-    class SessionConfig {
-    public:
+ActiveMQSession::ActiveMQSession(Pointer<ActiveMQSessionKernel> kernel) : cms::Session(), kernel(kernel) {
 
-        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
-                                     Pointer<ActiveMQConsumerKernel>,
-                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
-
-    private:
-
-        SessionConfig(const SessionConfig&);
-        SessionConfig& operator=(const SessionConfig&);
-
-    public:
-
-        bool synchronizationRegistered;
-        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
-        Pointer<Scheduler> scheduler;
-
-    public:
-
-        SessionConfig() : synchronizationRegistered(false), producers(), scheduler() {}
-        ~SessionConfig() {}
-    };
-
-}}
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession(ActiveMQConnection* connection,
-                                 const Pointer<SessionId>& id,
-                                 cms::Session::AcknowledgeMode ackMode,
-                                 const Properties& properties) : config(new SessionConfig),
-                                                                 sessionInfo(),
-                                                                 transaction(),
-                                                                 connection(connection),
-                                                                 consumers(),
-                                                                 closed(false),
-                                                                 executor(),
-                                                                 ackMode(ackMode),
-                                                                 producerIds(),
-                                                                 producerSequenceIds(),
-                                                                 consumerIds(),
-                                                                 lastDeliveredSequenceId(0) {
-
-    if (id == NULL || connection == NULL) {
+    if (kernel == NULL) {
         throw ActiveMQException(
             __FILE__, __LINE__,
-            "ActiveMQSession::ActiveMQSession - Constructor called with NULL data");
-    }
-
-    this->sessionInfo.reset(new SessionInfo());
-    this->sessionInfo->setAckMode(ackMode);
-    this->sessionInfo->setSessionId(id);
-
-    connection->oneway(this->sessionInfo);
-
-    this->closed = false;
-    this->lastDeliveredSequenceId = -1;
-
-    // Create a Transaction objet
-    this->transaction.reset(new ActiveMQTransactionContext(this, properties));
-
-    // Create the session executor object.
-    this->executor.reset(new ActiveMQSessionExecutor(this));
-
-    this->connection->addSession(this);
-
-    // Use the Connection's Scheduler.
-    this->config->scheduler = this->connection->getScheduler();
-
-    // If the connection is already started, start the session.
-    if (this->connection->isStarted()) {
-        this->start();
+            "ActiveMQSession::ActiveMQSession - Constructor called with NULL session kernel");
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::~ActiveMQSession() throw() {
-    try{
-        // Destroy this session's resources
-        close();
+ActiveMQSession::~ActiveMQSession() {
+    try {
+        this->kernel->close();
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
-
-    delete this->config;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::fire(const activemq::exceptions::ActiveMQException& ex) {
-    if (connection != NULL) {
-        connection->fire(ex);
-    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::close() {
-
-    // If we're already closed, just return.
-    if( this->closed.get() ) {
-        return;
-    }
-
-    if( this->transaction->isInXATransaction() ) {
-
-        // TODO - Right now we don't have a safe way of dealing with this case
-        // since the session might be deleted before the XA Transaction is finalized
-        // registering a Synchronization could result in an segmentation fault.
-        //
-        // For now we just close badly and throw an exception.
-        doClose();
-
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__,
-            "The Consumer is still in an Active XA Transaction, commit it first." );
-    }
-
     try {
-        doClose();
+        this->kernel->close();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::doClose() {
-
-    try {
-        dispose();
-
-        // Remove this session from the Broker.
-        Pointer<RemoveInfo> info(new RemoveInfo());
-        info->setObjectId(this->sessionInfo->getSessionId());
-        info->setLastDeliveredSequenceId(this->lastDeliveredSequenceId);
-        this->connection->oneway(info);
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::dispose() {
-
-    class Finalizer {
-    private:
-
-        ActiveMQSession* session;
-        ActiveMQConnection* connection;
-
-    private:
-
-        Finalizer( const Finalizer& );
-        Finalizer& operator= ( const Finalizer& );
-
-    public:
-
-        Finalizer(ActiveMQSession* session, ActiveMQConnection* connection) :
-            session( session ), connection( connection ) {
-        }
-
-        ~Finalizer() {
-            this->connection->removeSession(this->session);
-            this->session->closed = true;
-        }
-    };
-
-    try{
-
-        Finalizer final(this, this->connection);
-
-        // Stop the dispatch executor.
-        stop();
-
-        // Roll Back the transaction since we were closed without an explicit call
-        // to commit it.
-        if (this->transaction->isInTransaction()) {
-            this->transaction->rollback();
-        }
-
-        // Dispose of all Consumers, the dispose method skips the RemoveInfo command.
-        synchronized(&this->consumers) {
-
-            std::vector< Pointer<ActiveMQConsumerKernel> > closables = this->consumers.values();
-
-            for (std::size_t i = 0; i < closables.size(); ++i) {
-                try{
-                    closables[i]->setFailureError(this->connection->getFirstFailureError());
-                    closables[i]->dispose();
-                    this->lastDeliveredSequenceId =
-                        Math::max(this->lastDeliveredSequenceId, closables[i]->getLastDeliveredSequenceId());
-                } catch( cms::CMSException& ex ){
-                    /* Absorb */
-                }
-            }
-        }
-
-        // Dispose of all Producers, the dispose method skips the RemoveInfo command.
-        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
-
-        while( producerIter->hasNext() ) {
-            try{
-                producerIter->next()->dispose();
-            } catch( cms::CMSException& ex ){
-                /* Absorb */
-            }
-        }
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::commit() {
-
     try {
-
-        this->checkClosed();
-
-        if( !this->isTransacted() ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::commit - This Session is not Transacted");
-        }
-
-        // Commit the Transaction
-        this->transaction->commit();
+        this->kernel->commit();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::rollback() {
-
-    try{
-
-        this->checkClosed();
-
-        if( !this->isTransacted() ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::rollback - This Session is not Transacted" );
-        }
-
-        // Roll back the Transaction
-        this->transaction->rollback();
+    try {
+        this->kernel->rollback();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::recover() {
-
-    try{
-
-        checkClosed();
-
-        if (isTransacted()) {
-            throw cms::IllegalStateException("This session is transacted");
-        }
-
-        synchronized( &this->consumers ) {
-            std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-            std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-            for( ; iter != consumers.end(); ++iter ) {
-                (*iter)->rollback();
-            }
-        }
+    try {
+        this->kernel->recover();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::clearMessagesInProgress() {
-
-    if( this->executor.get() != NULL ) {
-        this->executor->clearMessagesInProgress();
-    }
-
-    synchronized( &this->consumers ) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for( ; iter != consumers.end(); ++iter ) {
-            (*iter)->inProgressClearRequired();
-
-            this->connection->getScheduler()->executeAfterDelay(
-                new ClearConsumerTask(*iter), 0LL);
-        }
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::acknowledge() {
-
-    synchronized( &this->consumers ) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for( ; iter != consumers.end(); ++iter ) {
-            (*iter)->acknowledge();
-        }
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::deliverAcks() {
-
-    synchronized( &this->consumers ) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for( ; iter != consumers.end(); ++iter ) {
-            (*iter)->deliverAcks();
-        }
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createConsumer( const cms::Destination* destination ) {
-
-    try{
-        this->checkClosed();
-        return this->createConsumer(destination, "", false);
+cms::MessageConsumer* ActiveMQSession::createConsumer(const cms::Destination* destination) {
+    try {
+        return this->kernel->createConsumer(destination, "", false);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createConsumer( const cms::Destination* destination,
-                                                       const std::string& selector ) {
-
-    try{
-        this->checkClosed();
-        return this->createConsumer(destination, selector, false);
+cms::MessageConsumer* ActiveMQSession::createConsumer(const cms::Destination* destination, const std::string& selector) {
+    try {
+        return this->kernel->createConsumer(destination, selector, false);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createConsumer( const cms::Destination* destination,
-                                                       const std::string& selector,
-                                                       bool noLocal ) {
-
-    try{
-
-        this->checkClosed();
-
-        // Cast the destination to an OpenWire destination, so we can
-        // get all the goodies.
-        const ActiveMQDestination* amqDestination =
-            dynamic_cast<const ActiveMQDestination*>( destination );
-
-        if( amqDestination == NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "Destination was either NULL or not created by this CMS Client" );
-        }
-
-        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
-
-        int prefetch = 0;
-        if( dest->isTopic() ) {
-            prefetch = this->connection->getPrefetchPolicy()->getTopicPrefetch();
-        } else {
-            prefetch = this->connection->getPrefetchPolicy()->getQueuePrefetch();
-        }
-
-        // Create the consumer instance.
-        Pointer<ActiveMQConsumerKernel> consumer(
-            new ActiveMQConsumerKernel(this, this->getNextConsumerId(),
-                                       dest, "", selector, prefetch, 0, noLocal,
-                                       false, this->connection->isDispatchAsync(), NULL));
-
-        try{
-            this->addConsumer(consumer);
-            this->connection->syncRequest(consumer->getConsumerInfo());
-        } catch (Exception& ex) {
-            this->removeConsumer(consumer->getConsumerId());
-            throw ex;
-        }
-
-        if (this->connection->isStarted()) {
-            consumer->start();
-        }
-
-        return new ActiveMQConsumer(consumer);
+cms::MessageConsumer* ActiveMQSession::createConsumer(const cms::Destination* destination, const std::string& selector, bool noLocal) {
+    try {
+        return this->kernel->createConsumer(destination, selector, noLocal);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createDurableConsumer( const cms::Topic* destination,
-                                                              const std::string& name,
-                                                              const std::string& selector,
-                                                              bool noLocal ) {
-
-    try{
-
-        this->checkClosed();
-
-        // Cast the destination to an OpenWire destination, so we can
-        // get all the goodies.
-        const ActiveMQDestination* amqDestination =
-            dynamic_cast<const ActiveMQDestination*>( destination );
-
-        if( amqDestination == NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "Destination was either NULL or not created by this CMS Client" );
-        }
-
-        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
-
-        // Create the consumer instance.
-        Pointer<ActiveMQConsumerKernel> consumer(
-            new ActiveMQConsumerKernel(this, this->getNextConsumerId(),
-                                       dest, name, selector,
-                                       this->connection->getPrefetchPolicy()->getDurableTopicPrefetch(),
-                                       0, noLocal, false, this->connection->isDispatchAsync(), NULL));
-
-        try {
-            this->addConsumer(consumer);
-            this->connection->syncRequest(consumer->getConsumerInfo());
-        } catch (Exception& ex) {
-            this->removeConsumer(consumer->getConsumerId());
-            throw ex;
-        }
-
-        if (this->connection->isStarted()) {
-            consumer->start();
-        }
-
-        return new ActiveMQConsumer(consumer);
+cms::MessageConsumer* ActiveMQSession::createDurableConsumer(const cms::Topic* destination, const std::string& name,
+                                                             const std::string& selector, bool noLocal) {
+    try {
+        return this->kernel->createDurableConsumer(destination, name, selector, noLocal);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::MessageProducer* ActiveMQSession::createProducer( const cms::Destination* destination ) {
-
-    try{
-
-        this->checkClosed();
-
-        Pointer<commands::ActiveMQDestination> dest;
-
-        // Producers are allowed to have NULL destinations.  In this case, the
-        // destination is specified by the messages as they are sent.
-        if (destination != NULL) {
-
-            const ActiveMQDestination* amqDestination =
-                dynamic_cast<const ActiveMQDestination*> (destination);
-
-            if (amqDestination == NULL) {
-                throw ActiveMQException(
-                    __FILE__, __LINE__,
-                    "Destination was either NULL or not created by this CMS Client" );
-            }
-
-            // Cast the destination to an OpenWire destination, so we can
-            // get all the goodies.
-            dest.reset(amqDestination->cloneDataStructure());
-        }
-
-        // Create the producer instance.
-        Pointer<ActiveMQProducerKernel> producer( new ActiveMQProducerKernel(
-            this, this->getNextProducerId(), dest, this->connection->getSendTimeout() ) );
-
-        try{
-            this->addProducer(producer);
-            this->connection->oneway(producer->getProducerInfo());
-        } catch (Exception& ex) {
-            this->removeProducer(producer->getProducerId());
-            throw ex;
-        }
-
-        return new ActiveMQProducer(producer);
+cms::MessageProducer* ActiveMQSession::createProducer(const cms::Destination* destination) {
+    try {
+        return this->kernel->createProducer(destination);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSession::createBrowser( const cms::Queue* queue ) {
-
-    try{
-        return ActiveMQSession::createBrowser(queue, "");
+cms::QueueBrowser* ActiveMQSession::createBrowser(const cms::Queue* queue) {
+    try {
+        return this->kernel->createBrowser(queue);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSession::createBrowser(const cms::Queue* queue,
-                                                  const std::string& selector) {
-
-    try{
-
-        this->checkClosed();
-
-        // Cast the destination to an OpenWire destination, so we can
-        // get all the goodies.
-        const ActiveMQDestination* amqDestination =
-            dynamic_cast<const ActiveMQDestination*> (queue);
-
-        if (amqDestination == NULL) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "Destination was either NULL or not created by this CMS Client" );
-        }
-
-        Pointer<ActiveMQDestination> dest(amqDestination->cloneDataStructure());
-
-        // Create the QueueBrowser instance
-        std::auto_ptr<ActiveMQQueueBrowser> browser(
-            new ActiveMQQueueBrowser(this, this->getNextConsumerId(), dest,
-                                     selector, this->connection->isDispatchAsync()));
-
-        return browser.release();
+cms::QueueBrowser* ActiveMQSession::createBrowser(const cms::Queue* queue, const std::string& selector) {
+    try {
+        return this->kernel->createBrowser(queue, selector);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Queue* ActiveMQSession::createQueue( const std::string& queueName ) {
-
-    try{
-
-        this->checkClosed();
-
-        if (queueName == "") {
-            throw IllegalArgumentException(
-                __FILE__, __LINE__, "Destination Name cannot be the Empty String." );
-        }
-
-        return new commands::ActiveMQQueue(queueName);
+cms::Queue* ActiveMQSession::createQueue(const std::string& queueName) {
+    try {
+        return this->kernel->createQueue(queueName);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Topic* ActiveMQSession::createTopic( const std::string& topicName ) {
-
-    try{
-
-        this->checkClosed();
-
-        if (topicName == "") {
-            throw IllegalArgumentException(
-                __FILE__, __LINE__, "Destination Name cannot be the Empty String." );
-        }
-
-        return new commands::ActiveMQTopic(topicName);
+cms::Topic* ActiveMQSession::createTopic(const std::string& topicName) {
+    try {
+        return this->kernel->createTopic(topicName);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue() {
-
-    try{
-
-        this->checkClosed();
-
-        std::auto_ptr<commands::ActiveMQTempQueue> queue(new
-            commands::ActiveMQTempQueue(this->createTemporaryDestinationName()));
-
-        // Register it with the Broker
-        this->createTemporaryDestination(queue.get());
-
-        return queue.release();
+    try {
+        return this->kernel->createTemporaryQueue();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic() {
-
-    try{
-
-        this->checkClosed();
-
-        std::auto_ptr<commands::ActiveMQTempTopic> topic(new
-            commands::ActiveMQTempTopic(createTemporaryDestinationName()));
-
-        // Register it with the Broker
-        this->createTemporaryDestination(topic.get());
-
-        return topic.release();
+    try {
+        return this->kernel->createTemporaryTopic();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::Message* ActiveMQSession::createMessage() {
-
-    try{
-
-        this->checkClosed();
-        commands::ActiveMQMessage* message = new commands::ActiveMQMessage();
-        message->setConnection(this->connection);
-        return message;
+    try {
+        return this->kernel->createMessage();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::BytesMessage* ActiveMQSession::createBytesMessage() {
-
-    try{
-
-        this->checkClosed();
-        commands::ActiveMQBytesMessage* message = new commands::ActiveMQBytesMessage();
-        message->setConnection(this->connection);
-        return message;
+    try {
+        return this->kernel->createBytesMessage();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* ActiveMQSession::createBytesMessage( const unsigned char* bytes, int bytesSize ) {
-
-    try{
-
-        this->checkClosed();
-        cms::BytesMessage* msg = createBytesMessage();
-        msg->setBodyBytes(bytes, bytesSize);
-        return msg;
+cms::BytesMessage* ActiveMQSession::createBytesMessage(const unsigned char* bytes, int bytesSize) {
+    try {
+        return this->kernel->createBytesMessage(bytes, bytesSize);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::StreamMessage* ActiveMQSession::createStreamMessage() {
-
-    try{
-
-        this->checkClosed();
-        commands::ActiveMQStreamMessage* message = new commands::ActiveMQStreamMessage();
-        message->setConnection(this->connection);
-        return message;
+    try {
+        return this->kernel->createStreamMessage();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::TextMessage* ActiveMQSession::createTextMessage() {
-
-    try{
-
-        this->checkClosed();
-        commands::ActiveMQTextMessage* message = new commands::ActiveMQTextMessage();
-        message->setConnection(this->connection);
-        return message;
+    try {
+        return this->kernel->createTextMessage();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSession::createTextMessage( const std::string& text ) {
-
+cms::TextMessage* ActiveMQSession::createTextMessage(const std::string& text) {
     try {
-
-        this->checkClosed();
-        cms::TextMessage* msg = createTextMessage();
-        msg->setText(text.c_str());
-        return msg;
+        return this->kernel->createTextMessage(text);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::MapMessage* ActiveMQSession::createMapMessage() {
-
-    try{
-
-        this->checkClosed();
-        commands::ActiveMQMapMessage* message = new commands::ActiveMQMapMessage();
-        message->setConnection(this->connection);
-        return message;
-    }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode() const {
-    return this->ackMode;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQSession::isTransacted() const {
-    return (this->ackMode == Session::SESSION_TRANSACTED) || this->transaction->isInXATransaction();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::send(cms::Message* message, ActiveMQProducerKernel* producer, util::Usage* usage) {
-
     try {
-
-        this->checkClosed();
-
-        commands::Message* amqMessage = dynamic_cast< commands::Message* >(message);
-
-        if (amqMessage == NULL) {
-            throw ActiveMQException(__FILE__, __LINE__,
-                "ActiveMQSession::send - Message is not a valid Open Wire type.");
-        }
-
-        // Clear any old data that might be in the message object
-        amqMessage->getMessageId().reset(NULL);
-        amqMessage->getProducerId().reset(NULL);
-        amqMessage->getTransactionId().reset(NULL);
-
-        // Always assign the message ID, regardless of the disable
-        // flag.  Not adding a message ID will cause an NPE at the broker.
-        decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
-        id->setProducerId(producer->getProducerInfo()->getProducerId());
-        id->setProducerSequenceId(this->getNextProducerSequenceId());
-
-        amqMessage->setMessageId(id);
-
-        // Ensure that a new transaction is started if this is the first message
-        // sent since the last commit.
-        doStartTransaction();
-        amqMessage->setTransactionId(this->transaction->getTransactionId());
-
-        // NOTE:
-        // Now we copy the message before sending, this allows the user to reuse the
-        // message object without interfering with the copy that's being sent.  We
-        // could make this step optional to increase performance but for now we won't.
-        // To not do this implies that the user must never reuse the message object, or
-        // know that the configuration of Transports doesn't involve the message hanging
-        // around beyond the point that send returns.
-        Pointer<commands::Message> msgCopy(amqMessage->cloneDataStructure());
-
-        msgCopy->onSend();
-        msgCopy->setProducerId( producer->getProducerInfo()->getProducerId() );
-
-        if (this->connection->getSendTimeout() <= 0 &&
-            !msgCopy->isResponseRequired() &&
-            !this->connection->isAlwaysSyncSend() &&
-            (!msgCopy->isPersistent() || this->connection->isUseAsyncSend() ||
-               msgCopy->getTransactionId() != NULL)) {
-
-            if (usage != NULL) {
-                usage->enqueueUsage(msgCopy->getSize());
-            }
-
-            // No Response Required.
-            this->connection->oneway(msgCopy);
-
-        } else {
-
-            // Send the message to the broker.
-            this->connection->syncRequest(msgCopy, this->connection->getSendTimeout());
-        }
+        return this->kernel->createMapMessage();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::ExceptionListener* ActiveMQSession::getExceptionListener() {
-
-    if( connection != NULL ) {
-        return connection->getExceptionListener();
-    }
-
-    return NULL;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Pointer<Scheduler> ActiveMQSession::getScheduler() const {
-    return this->config->scheduler;
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::unsubscribe(const std::string& name) {
-
     try{
-
-        this->checkClosed();
-
-        Pointer<RemoveSubscriptionInfo> rsi(new RemoveSubscriptionInfo());
-
-        rsi->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
-        rsi->setSubcriptionName(name);
-        rsi->setClientId(this->connection->getConnectionInfo().getClientId());
-
-        // Send the message to the broker.
-        this->connection->syncRequest(rsi);
+        this->kernel->unsubscribe(name);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::dispatch(const Pointer<MessageDispatch>& dispatch) {
-
-    if (this->executor.get() != NULL) {
-        this->executor->execute( dispatch );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::redispatch(MessageDispatchChannel& unconsumedMessages) {
-
-    std::vector< Pointer<MessageDispatch> > messages = unconsumedMessages.removeAll();
-    std::vector< Pointer<MessageDispatch> >::reverse_iterator iter = messages.rbegin();
-
-    for (; iter != messages.rend(); ++iter) {
-        executor->executeFirst( *iter );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::start() {
-
-    synchronized(&this->consumers) {
-        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
-
-        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
-        for (; iter != consumers.end(); ++iter) {
-            (*iter)->start();
-        }
-    }
-
-    if (this->executor.get() != NULL) {
-        this->executor->start();
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::stop() {
-
-    if (this->executor.get() != NULL) {
-        this->executor->stop();
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQSession::isStarted() const {
-
-    if (this->executor.get() == NULL) {
-        return false;
-    }
-
-    return this->executor->isRunning();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination) {
-
-    try {
-
-        Pointer<DestinationInfo> command(new DestinationInfo());
-        command->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
-        command->setOperationType(ActiveMQConstants::DESTINATION_ADD_OPERATION);
-        command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure()));
-
-        // Send the message to the broker.
-        this->syncRequest(command);
-
-        // Now that its setup, link it to this Connection so it can be closed.
-        tempDestination->setConnection(this->connection);
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::destroyTemporaryDestination(
-    commands::ActiveMQTempDestination* tempDestination) {
-
-    try {
-
-        Pointer<DestinationInfo> command(new DestinationInfo());
-
-        command->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
-        command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
-        command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure()));
-
-        // Send the message to the broker.
-        this->connection->syncRequest(command);
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-std::string ActiveMQSession::createTemporaryDestinationName() {
-
-    try {
-        return this->connection->getConnectionId().getValue() + ":" +
-               Long::toString(this->connection->getNextTempDestinationId());
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::oneway(Pointer<Command> command) {
-
-    try{
-        this->checkClosed();
-        this->connection->oneway(command);
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ActiveMQSession::syncRequest(Pointer<Command> command, unsigned int timeout) {
-
-    try{
-        this->checkClosed();
-        return this->connection->syncRequest(command, timeout);
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::checkClosed() const {
-    if( this->closed.get() ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQSession - Session Already Closed" );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::addConsumer(Pointer<ActiveMQConsumerKernel> consumer) {
-
-    try{
-
-        this->checkClosed();
-
-        // Add the consumer to the map.
-        synchronized(&this->consumers) {
-            this->consumers.put(consumer->getConsumerInfo()->getConsumerId(), consumer);
-        }
-
-        // Register this as a message dispatcher for the consumer.
-        this->connection->addDispatcher(consumer->getConsumerInfo()->getConsumerId(), this);
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::removeConsumer(const Pointer<ConsumerId>& consumerId) {
-
-    try{
-
-        this->checkClosed();
-
-        synchronized(&this->consumers) {
-            if (this->consumers.containsKey(consumerId)) {
-                // Remove this Id both from the Sessions Map of Consumers and from the Connection.
-                // If the kernels parent is destroyed then it will get cleaned up now.
-                this->connection->removeDispatcher(consumerId);
-                this->consumers.remove(consumerId);
-            }
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::addProducer(Pointer<ActiveMQProducerKernel> producer) {
-
     try{
-
-        this->checkClosed();
-
-        this->config->producers.add(producer);
-
-        // Add to the Connections list
-        this->connection->addProducer(producer);
+        this->kernel->start();
     }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::removeProducer(const Pointer<commands::ProducerId>& producerId) {
-
+void ActiveMQSession::stop() {
     try{
-
-        this->checkClosed();
-
-        this->connection->removeProducer(producerId);
-
-        std::auto_ptr<Iterator<Pointer< ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
-
-        Pointer<ActiveMQProducerKernel> toRemove;
-        while (producerIter->hasNext()) {
-            Pointer<ActiveMQProducerKernel> temp = producerIter->next();
-            if (temp->getProducerId()->equals(*producerId)) {
-                toRemove = temp;
-                break;
-            }
-        }
-
-        if (toRemove != NULL) {
-            this->config->producers.remove(toRemove);
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::doStartTransaction() {
-
-    if (this->isTransacted() && !this->transaction->isInXATransaction()) {
-        this->transaction->begin();
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::wakeup() {
-
-    if (this->executor.get() != NULL) {
-        this->executor->wakeup();
+        this->kernel->stop();
     }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Pointer<commands::ConsumerId> ActiveMQSession::getNextConsumerId() {
-    Pointer<ConsumerId> consumerId(new commands::ConsumerId());
-
-    consumerId->setConnectionId(this->connection->getConnectionId().getValue());
-    consumerId->setSessionId(this->sessionInfo->getSessionId()->getValue());
-    consumerId->setValue(this->consumerIds.getNextSequenceId());
-
-    return consumerId;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Pointer<commands::ProducerId> ActiveMQSession::getNextProducerId() {
-    Pointer<ProducerId> producerId(new ProducerId());
-
-    producerId->setConnectionId(this->connection->getConnectionId().getValue());
-    producerId->setSessionId(this->sessionInfo->getSessionId()->getValue());
-    producerId->setValue(this->producerIds.getNextSequenceId());
-
-    return producerId;
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }



Mime
View raw message