activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1339242 [1/2] - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/
Date Wed, 16 May 2012 16:02:06 GMT
Author: tabish
Date: Wed May 16 16:02:06 2012
New Revision: 1339242

URL: http://svn.apache.org/viewvc?rev=1339242&view=rev
Log:
Refactor and clean up the State tracker a bit.  

Also has a fix for: https://issues.apache.org/jira/browse/AMQCPP-402

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConsumerState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConsumerState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/Tracked.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/Tracked.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h?rev=1339242&r1=1339241&r2=1339242&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h Wed May 16 16:02:06 2012
@@ -69,124 +69,86 @@ namespace state {
     class AMQCPP_API CommandVisitor {
     public:
 
-        virtual ~CommandVisitor() {}
+        virtual ~CommandVisitor() {
+        }
 
-        virtual decaf::lang::Pointer<commands::Command> processTransactionInfo(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processTransactionInfo(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveInfo(
-            commands::RemoveInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRemoveInfo(commands::RemoveInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processConnectionInfo(
-            commands::ConnectionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processConnectionInfo(commands::ConnectionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processSessionInfo(
-            commands::SessionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processSessionInfo(commands::SessionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processProducerInfo(
-            commands::ProducerInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processProducerInfo(commands::ProducerInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processConsumerInfo(
-            commands::ConsumerInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processConsumerInfo(commands::ConsumerInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveConnection(
-            commands::ConnectionId* id ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRemoveConnection(commands::ConnectionId* id) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveSession(
-            commands::SessionId* id ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRemoveSession(commands::SessionId* id) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveProducer(
-            commands::ProducerId* id ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRemoveProducer(commands::ProducerId* id) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveConsumer(
-            commands::ConsumerId* id ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRemoveConsumer(commands::ConsumerId* id) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processDestinationInfo(
-            commands::DestinationInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processDestinationInfo(commands::DestinationInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveDestination(
-            commands::DestinationInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRemoveDestination(commands::DestinationInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveSubscriptionInfo(
-            commands::RemoveSubscriptionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRemoveSubscriptionInfo(commands::RemoveSubscriptionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processMessage(
-            commands::Message* send ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processMessage(commands::Message* send) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processMessageAck(
-            commands::MessageAck* ack ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processMessageAck(commands::MessageAck* ack) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processMessagePull(
-            commands::MessagePull* pull ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processMessagePull(commands::MessagePull* pull) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processBeginTransaction(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processBeginTransaction(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processPrepareTransaction(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processPrepareTransaction(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionOnePhase(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionOnePhase(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionTwoPhase(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionTwoPhase(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRollbackTransaction(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRollbackTransaction(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processWireFormat(
-            commands::WireFormatInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processWireFormat(commands::WireFormatInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processKeepAliveInfo(
-            commands::KeepAliveInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processKeepAliveInfo(commands::KeepAliveInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processShutdownInfo(
-            commands::ShutdownInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processShutdownInfo(commands::ShutdownInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processFlushCommand(
-            commands::FlushCommand* command ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processFlushCommand(commands::FlushCommand* command) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processBrokerInfo(
-            commands::BrokerInfo* info) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processBrokerInfo(commands::BrokerInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processRecoverTransactions(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processRecoverTransactions(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processForgetTransaction(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processForgetTransaction(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processEndTransaction(
-            commands::TransactionInfo* info ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processEndTransaction(commands::TransactionInfo* info) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processMessageDispatchNotification(
-            commands::MessageDispatchNotification* notification ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processMessageDispatchNotification(commands::MessageDispatchNotification* notification) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processProducerAck(
-            commands::ProducerAck* ack ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processProducerAck(commands::ProducerAck* ack) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processMessageDispatch(
-            commands::MessageDispatch* dispatch ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processMessageDispatch(commands::MessageDispatch* dispatch) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processControlCommand(
-            commands::ControlCommand* command ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processControlCommand(commands::ControlCommand* command) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processConnectionError(
-            commands::ConnectionError* error ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processConnectionError(commands::ConnectionError* error) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processConnectionControl(
-            commands::ConnectionControl* control ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processConnectionControl(commands::ConnectionControl* control) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processConsumerControl(
-            commands::ConsumerControl* control ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processConsumerControl(commands::ConsumerControl* control) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processBrokerError(
-            commands::BrokerError* error ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processBrokerError(commands::BrokerError* error) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processReplayCommand(
-            commands::ReplayCommand* replay ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processReplayCommand(commands::ReplayCommand* replay) = 0;
 
-        virtual decaf::lang::Pointer<commands::Command> processResponse(
-            commands::Response* response ) = 0;
+        virtual decaf::lang::Pointer<commands::Command> processResponse(commands::Response* response) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h?rev=1339242&r1=1339241&r2=1339242&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h Wed May 16 16:02:06 2012
@@ -63,287 +63,213 @@ namespace state {
      *
      * @since 3.0
      */
-    class AMQCPP_API CommandVisitorAdapter : public CommandVisitor {
+    class AMQCPP_API CommandVisitorAdapter: public CommandVisitor {
     public:
 
-        virtual ~CommandVisitorAdapter() {}
-
-        virtual decaf::lang::Pointer<commands::Command> processRemoveConnection(
-            commands::ConnectionId* id AMQCPP_UNUSED ) {
+        virtual ~CommandVisitorAdapter() {
+        }
 
+        virtual decaf::lang::Pointer<commands::Command> processRemoveConnection(commands::ConnectionId* id AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveSession(
-            commands::SessionId* id AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processRemoveSession(commands::SessionId* id AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveProducer(
-            commands::ProducerId* id AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processRemoveProducer(commands::ProducerId* id AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveConsumer(
-            commands::ConsumerId* id AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processRemoveConsumer(commands::ConsumerId* id AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processDestinationInfo(
-            commands::DestinationInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processDestinationInfo(commands::DestinationInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveDestination(
-            commands::DestinationInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processRemoveDestination(commands::DestinationInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveSubscriptionInfo(
-            commands::RemoveSubscriptionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processRemoveSubscriptionInfo(commands::RemoveSubscriptionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processMessage(
-            commands::Message* send AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processMessage(commands::Message* send AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processMessageAck(
-            commands::MessageAck* ack AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processMessageAck(commands::MessageAck* ack AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processMessagePull(
-            commands::MessagePull* pull AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processMessagePull(commands::MessagePull* pull AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processBeginTransaction(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processBeginTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processPrepareTransaction(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processPrepareTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionOnePhase(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionOnePhase(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionTwoPhase(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processCommitTransactionTwoPhase(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRollbackTransaction(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processRollbackTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processWireFormat(
-            commands::WireFormatInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processWireFormat(commands::WireFormatInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processKeepAliveInfo(
-            commands::KeepAliveInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processKeepAliveInfo(commands::KeepAliveInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processShutdownInfo(
-            commands::ShutdownInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processShutdownInfo(commands::ShutdownInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processFlushCommand(
-            commands::FlushCommand* command AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processFlushCommand(commands::FlushCommand* command AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processBrokerInfo(
-            commands::BrokerInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processBrokerInfo(commands::BrokerInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRecoverTransactions(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processRecoverTransactions(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processForgetTransaction(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processForgetTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processEndTransaction(
-            commands::TransactionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processEndTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processMessageDispatchNotification(
-            commands::MessageDispatchNotification* notification AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processMessageDispatchNotification(commands::MessageDispatchNotification* notification AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processProducerAck(
-            commands::ProducerAck* ack AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processProducerAck(commands::ProducerAck* ack AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processMessageDispatch(
-            commands::MessageDispatch* dispatch AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processMessageDispatch(commands::MessageDispatch* dispatch AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processControlCommand(
-            commands::ControlCommand* command AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processControlCommand(commands::ControlCommand* command AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processConnectionError(
-            commands::ConnectionError* error AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processConnectionError(commands::ConnectionError* error AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processConnectionControl(
-            commands::ConnectionControl* control AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processConnectionControl(commands::ConnectionControl* control AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processConsumerControl(
-            commands::ConsumerControl* control AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processConsumerControl(commands::ConsumerControl* control AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processBrokerError(
-            commands::BrokerError* error AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processBrokerError(commands::BrokerError* error AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processReplayCommand(
-            commands::ReplayCommand* replay AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processReplayCommand(commands::ReplayCommand* replay AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processResponse(
-            commands::Response* response AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processResponse(commands::Response* response AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processConnectionInfo(
-            commands::ConnectionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processConnectionInfo(commands::ConnectionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processSessionInfo(
-            commands::SessionInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processSessionInfo(commands::SessionInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processProducerInfo(
-            commands::ProducerInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processProducerInfo(commands::ProducerInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processConsumerInfo(
-            commands::ConsumerInfo* info AMQCPP_UNUSED ) {
-
+        virtual decaf::lang::Pointer<commands::Command> processConsumerInfo(commands::ConsumerInfo* info AMQCPP_UNUSED ) {
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processTransactionInfo(
-            commands::TransactionInfo* info ) {
+        virtual decaf::lang::Pointer<commands::Command> processTransactionInfo(commands::TransactionInfo* info) {
 
-            if( info != decaf::lang::Pointer<commands::Command>() ) {
-                switch( info->getType() ) {
+            if (info != decaf::lang::Pointer<commands::Command>()) {
+                switch (info->getType()) {
                     case core::ActiveMQConstants::TRANSACTION_STATE_BEGIN:
-                        return this->processBeginTransaction( info );
+                        return this->processBeginTransaction(info);
                     case core::ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE:
-                        return this->processCommitTransactionOnePhase( info );
+                        return this->processCommitTransactionOnePhase(info);
                     case core::ActiveMQConstants::TRANSACTION_STATE_COMMITTWOPHASE:
-                        return this->processCommitTransactionTwoPhase( info );
+                        return this->processCommitTransactionTwoPhase(info);
                     case core::ActiveMQConstants::TRANSACTION_STATE_END:
-                        return this->processEndTransaction( info );
+                        return this->processEndTransaction(info);
                     case core::ActiveMQConstants::TRANSACTION_STATE_FORGET:
-                        return this->processForgetTransaction( info );
+                        return this->processForgetTransaction(info);
                     case core::ActiveMQConstants::TRANSACTION_STATE_PREPARE:
-                        return this->processPrepareTransaction( info );
+                        return this->processPrepareTransaction(info);
                     case core::ActiveMQConstants::TRANSACTION_STATE_RECOVER:
-                        return this->processRecoverTransactions( info );
+                        return this->processRecoverTransactions(info);
                     case core::ActiveMQConstants::TRANSACTION_STATE_ROLLBACK:
-                        return this->processRollbackTransaction( info );
+                        return this->processRollbackTransaction(info);
                     default:
                         throw exceptions::ActiveMQException(
                             __FILE__, __LINE__, "Unknown Transaction Info Type.");
                 }
             }
+
             return decaf::lang::Pointer<commands::Command>();
         }
 
-        virtual decaf::lang::Pointer<commands::Command> processRemoveInfo(
-            commands::RemoveInfo* info ) {
+        virtual decaf::lang::Pointer<commands::Command> processRemoveInfo(commands::RemoveInfo* info) {
 
-            if( info != decaf::lang::Pointer<commands::Command>() ) {
-                switch( info->getObjectId()->getDataStructureType() ) {
+            if (info != decaf::lang::Pointer<commands::Command>()) {
+                switch (info->getObjectId()->getDataStructureType()) {
                     case commands::ConnectionId::ID_CONNECTIONID:
                         return this->processRemoveConnection(
-                            dynamic_cast<commands::ConnectionId*>( info->getObjectId().get() ) );
+                            dynamic_cast<commands::ConnectionId*> (info->getObjectId().get()));
                     case commands::SessionId::ID_SESSIONID:
                         return this->processRemoveSession(
-                            dynamic_cast<commands::SessionId*>( info->getObjectId().get() ) );
+                            dynamic_cast<commands::SessionId*> (info->getObjectId().get()));
                     case commands::ConsumerId::ID_CONSUMERID:
                         return this->processRemoveConsumer(
-                            dynamic_cast<commands::ConsumerId*>( info->getObjectId().get() ) );
+                            dynamic_cast<commands::ConsumerId*> (info->getObjectId().get()));
                     case commands::ProducerId::ID_PRODUCERID:
                         return this->processRemoveProducer(
-                            dynamic_cast<commands::ProducerId*>( info->getObjectId().get() ) );
+                            dynamic_cast<commands::ProducerId*> (info->getObjectId().get()));
                     default:
                         throw exceptions::ActiveMQException(
                             __FILE__, __LINE__, "Unknown Remove Info Type.");
                 }
             }
+
             return decaf::lang::Pointer<commands::Command>();
         }
-
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp?rev=1339242&r1=1339241&r2=1339242&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp Wed May 16 16:02:06 2012
@@ -27,7 +27,7 @@ using namespace activemq::state;
 using namespace activemq::commands;
 
 ////////////////////////////////////////////////////////////////////////////////
-ConnectionState::ConnectionState( const Pointer<ConnectionInfo>& info ) :
+ConnectionState::ConnectionState(Pointer<ConnectionInfo> info) :
     info(info),
     transactions(),
     sessions(),
@@ -51,7 +51,7 @@ ConnectionState::~ConnectionState() {
 ////////////////////////////////////////////////////////////////////////////////
 std::string ConnectionState::toString() const {
 
-    if( this->info.get() != NULL ) {
+    if (this->info.get() != NULL) {
         return this->info->toString();
     }
 
@@ -59,24 +59,23 @@ std::string ConnectionState::toString() 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionState::reset( const Pointer<ConnectionInfo>& info ) {
-
+void ConnectionState::reset(Pointer<ConnectionInfo> info) {
     this->info = info;
     transactions.clear();
     sessions.clear();
     tempDestinations.clear();
-    disposed.set( false );
+    disposed.set(false);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ConnectionState::shutdown() {
 
-    if( this->disposed.compareAndSet( false, true ) ) {
+    if (this->disposed.compareAndSet(false, true)) {
 
-        std::vector< Pointer<SessionState> > values = this->sessions.values();
-        std::vector< Pointer<SessionState> >::iterator iter = values.begin();
+        std::vector<Pointer<SessionState> > values = this->sessions.values();
+        std::vector<Pointer<SessionState> >::iterator iter = values.begin();
 
-        for( ; iter != values.end(); ++iter ) {
+        for (; iter != values.end(); ++iter) {
             (*iter)->shutdown();
         }
     }
@@ -85,8 +84,8 @@ void ConnectionState::shutdown() {
 ////////////////////////////////////////////////////////////////////////////////
 void ConnectionState::checkShutdown() const {
 
-    if( this->disposed.get() ) {
+    if (this->disposed.get()) {
         throw decaf::lang::exceptions::IllegalStateException(
-            __FILE__, __LINE__, "Connection already Disposed" );
+            __FILE__, __LINE__, "Connection already Disposed");
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h?rev=1339242&r1=1339241&r2=1339242&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h Wed May 16 16:02:06 2012
@@ -67,13 +67,13 @@ namespace state {
 
     public:
 
-        ConnectionState( const Pointer<ConnectionInfo>& info );
+        ConnectionState(Pointer<ConnectionInfo> info);
 
         virtual ~ConnectionState();
 
         std::string toString() const;
 
-        const Pointer<commands::ConnectionInfo>& getInfo() const {
+        const Pointer<commands::ConnectionInfo> getInfo() const {
             return this->info;
         }
 
@@ -81,71 +81,68 @@ namespace state {
 
         void shutdown();
 
-        void reset( const Pointer<ConnectionInfo>& info );
+        void reset(Pointer<ConnectionInfo> info);
 
-        void addTempDestination( const Pointer<DestinationInfo>& info ) {
+        void addTempDestination(Pointer<DestinationInfo> info) {
             checkShutdown();
-            tempDestinations.add( info );
+            tempDestinations.add(info);
         }
 
-        void removeTempDestination( const Pointer<ActiveMQDestination>& destination ) {
+        void removeTempDestination(Pointer<ActiveMQDestination> destination) {
 
-            std::auto_ptr< decaf::util::Iterator< Pointer<DestinationInfo> > > iter(
-                tempDestinations.iterator() );
+            std::auto_ptr<decaf::util::Iterator<Pointer<DestinationInfo> > > iter(tempDestinations.iterator());
 
-            while( iter->hasNext() ) {
+            while (iter->hasNext()) {
                 Pointer<DestinationInfo> di = iter->next();
-                if( di->getDestination()->equals( destination.get() ) ) {
+                if (di->getDestination()->equals(destination.get())) {
                     iter->remove();
                 }
             }
         }
 
-        void addTransactionState( const Pointer<TransactionId>& id ) {
+        void addTransactionState(Pointer<TransactionId> id) {
             checkShutdown();
-            transactions.put( id.dynamicCast<LocalTransactionId>(),
-                              Pointer<TransactionState>( new TransactionState( id ) ) );
+            transactions.put(id.dynamicCast<LocalTransactionId>(), Pointer<TransactionState>(new TransactionState(id)));
         }
 
-        const Pointer<TransactionState>& getTransactionState( const Pointer<TransactionId>& id ) const {
-            return transactions.get( id.dynamicCast<LocalTransactionId>() );
+        const Pointer<TransactionState>& getTransactionState(Pointer<TransactionId> id) const {
+            return transactions.get(id.dynamicCast<LocalTransactionId>());
         }
 
-        std::vector< Pointer<TransactionState> > getTransactionStates() const {
+        std::vector<Pointer<TransactionState> > getTransactionStates() const {
             return transactions.values();
         }
 
-        Pointer<TransactionState> removeTransactionState( const Pointer<TransactionId>& id ) {
-            return transactions.remove( id.dynamicCast<LocalTransactionId>() );
+        Pointer<TransactionState> removeTransactionState(Pointer<TransactionId> id) {
+            return transactions.remove(id.dynamicCast<LocalTransactionId>());
         }
 
-        void addSession( const Pointer<SessionInfo>& info ) {
+        void addSession(Pointer<SessionInfo> info) {
             checkShutdown();
-            sessions.put(
-                info->getSessionId(), Pointer<SessionState>( new SessionState( info ) ) );
+            sessions.put(info->getSessionId(), Pointer<SessionState>(new SessionState(info)));
         }
 
-        Pointer<SessionState> removeSession( const Pointer<SessionId>& id ) {
-            return sessions.remove( id );
+        Pointer<SessionState> removeSession(Pointer<SessionId> id) {
+            return sessions.remove(id);
         }
 
-        const Pointer<SessionState>& getSessionState( const Pointer<SessionId>& id ) const {
-            return sessions.get( id );
+        const Pointer<SessionState> getSessionState(Pointer<SessionId> id) const {
+            return sessions.get(id);
         }
 
-        const LinkedList< Pointer<DestinationInfo> >& getTempDesinations() const {
+        const LinkedList<Pointer<DestinationInfo> >& getTempDesinations() const {
             return tempDestinations;
         }
 
-        std::vector< Pointer<SessionState> > getSessionStates() const {
+        std::vector<Pointer<SessionState> > getSessionStates() const {
             return sessions.values();
         }
 
-        StlMap< Pointer<ConsumerId>, Pointer<ConsumerInfo>, ConsumerId::COMPARATOR >& getRecoveringPullConsumers() {
+        StlMap<Pointer<ConsumerId>, Pointer<ConsumerInfo>, ConsumerId::COMPARATOR>& getRecoveringPullConsumers() {
             return recoveringPullConsumers;
         }
 
-        void setConnectionInterruptProcessingComplete( bool connectionInterruptProcessingComplete ) {
+        void setConnectionInterruptProcessingComplete(bool connectionInterruptProcessingComplete) {
             this->connectionInterruptProcessingComplete = connectionInterruptProcessingComplete;
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1339242&r1=1339241&r2=1339242&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp Wed May 16 16:02:06 2012
@@ -21,6 +21,7 @@
 #include <decaf/util/NoSuchElementException.h>
 
 #include <activemq/commands/ConsumerControl.h>
+#include <activemq/commands/ExceptionResponse.h>
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/transport/TransportListener.h>
@@ -43,26 +44,29 @@ namespace state {
     class RemoveTransactionAction : public Runnable {
     private:
 
-        const Pointer<TransactionInfo> info;
+        Pointer<TransactionInfo> info;
         ConnectionStateTracker* stateTracker;
 
     private:
 
-        RemoveTransactionAction( const RemoveTransactionAction& );
-        RemoveTransactionAction& operator= ( const RemoveTransactionAction& );
+        RemoveTransactionAction(const RemoveTransactionAction&);
+        RemoveTransactionAction& operator=(const RemoveTransactionAction&);
 
     public:
 
-        RemoveTransactionAction( ConnectionStateTracker* stateTracker,
-                                 const Pointer<TransactionInfo>& info ) :
-            info( info ), stateTracker( stateTracker ) {}
+        RemoveTransactionAction(ConnectionStateTracker* stateTracker, Pointer<TransactionInfo> info) :
+            info(info), stateTracker(stateTracker) {
+        }
 
         virtual ~RemoveTransactionAction() {}
 
         virtual void run() {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            Pointer<ConnectionState> cs = stateTracker->connectionStates.get( connectionId );
-            cs->removeTransactionState( info->getTransactionId() );
+            Pointer<ConnectionState> cs = stateTracker->connectionStates.get(connectionId);
+            Pointer<TransactionState> txState = cs->removeTransactionState(info->getTransactionId());
+            if (txState != NULL) {
+                txState->clear();
+            }
         }
     };
 
@@ -88,12 +92,12 @@ ConnectionStateTracker::~ConnectionState
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Tracked> ConnectionStateTracker::track( const Pointer<Command>& command ) {
+Pointer<Tracked> ConnectionStateTracker::track(Pointer<Command> command) {
 
     try{
 
-        Pointer<Command> result = command->visit( this );
-        if( result == NULL ) {
+        Pointer<Command> result = command->visit(this);
+        if (result == NULL) {
             return Pointer<Tracked>();
         } else {
             return result.dynamicCast<Tracked>();
@@ -105,9 +109,9 @@ Pointer<Tracked> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
+void ConnectionStateTracker::trackBack(Pointer<Command> command) {
 
-    try{
+    try {
         if (command != NULL) {
             if (trackMessages && command->isMessage()) {
                 Pointer<Message> message = command.dynamicCast<Message>();
@@ -129,43 +133,43 @@ void ConnectionStateTracker::trackBack( 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionStateTracker::restore( const Pointer<transport::Transport>& transport ) {
+void ConnectionStateTracker::restore(Pointer<transport::Transport> transport) {
 
     try{
 
         std::vector< Pointer<ConnectionState> > connectionStates = this->connectionStates.values();
         std::vector< Pointer<ConnectionState> >::const_iterator iter = connectionStates.begin();
 
-        for( ; iter != connectionStates.end(); ++iter ) {
+        for (; iter != connectionStates.end(); ++iter) {
             Pointer<ConnectionState> state = *iter;
 
             Pointer<ConnectionInfo> info = state->getInfo();
-            info->setFailoverReconnect( true );
-            transport->oneway( info );
+            info->setFailoverReconnect(true);
+            transport->oneway(info);
 
-            doRestoreTempDestinations( transport, state );
+            doRestoreTempDestinations(transport, state);
 
-            if( restoreSessions ) {
-                doRestoreSessions( transport, state );
+            if (restoreSessions) {
+                doRestoreSessions(transport, state);
             }
 
-            if( restoreTransaction ) {
-                doRestoreTransactions( transport, state );
+            if (restoreTransaction) {
+                doRestoreTransactions(transport, state);
             }
         }
 
         // Now we flush messages
-        std::vector< Pointer<Command> > messages = messageCache.values();
-        std::vector< Pointer<Command> >::const_iterator messageIter = messages.begin();
+        std::vector<Pointer<Command> > messages = messageCache.values();
+        std::vector<Pointer<Command> >::const_iterator messageIter = messages.begin();
 
-        for( ; messageIter != messages.end(); ++messageIter ) {
-            transport->oneway( *messageIter );
+        for (; messageIter != messages.end(); ++messageIter) {
+            transport->oneway(*messageIter);
         }
 
-        std::vector< Pointer<Command> > messagePulls = messagePullCache.values();
-        std::vector< Pointer<Command> >::const_iterator messagePullIter = messagePulls.begin();
+        std::vector<Pointer<Command> > messagePulls = messagePullCache.values();
+        std::vector<Pointer<Command> >::const_iterator messagePullIter = messagePulls.begin();
 
-        for(; messagePullIter != messagePulls.end(); ++messagePullIter) {
+        for (; messagePullIter != messagePulls.end(); ++messagePullIter) {
             transport->oneway(*messagePullIter);
         }
     }
@@ -175,61 +179,61 @@ void ConnectionStateTracker::restore( co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionStateTracker::doRestoreTransactions( const Pointer<transport::Transport>& transport,
-                                                    const Pointer<ConnectionState>& connectionState ) {
+void ConnectionStateTracker::doRestoreTransactions(Pointer<transport::Transport> transport,
+                                                   Pointer<ConnectionState> connectionState) {
 
-    try{
+    try {
 
-        std::vector< Pointer<Command> > toIgnore;
+        std::vector<Pointer<TransactionInfo> > toRollback;
 
         // Restore the session's transaction state
-        std::vector< Pointer<TransactionState> > transactionStates =
-            connectionState->getTransactionStates();
+        std::vector<Pointer<TransactionState> > transactionStates = connectionState->getTransactionStates();
+        std::vector<Pointer<TransactionState> >::const_iterator iter = transactionStates.begin();
 
-        std::vector< Pointer<TransactionState> >::const_iterator iter = transactionStates.begin();
-
-        for( ; iter != transactionStates.end(); ++iter ) {
-
-            // ignore any empty (ack) transaction
-            if( (*iter)->getCommands().size() == 2 ) {
-                Pointer<Command> lastCommand = (*iter)->getCommands().get(1);
-                if( lastCommand->isTransactionInfo() ) {
-                    Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
-
-                    if( transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE ) {
-                        toIgnore.push_back(lastCommand);
-                        continue;
-                    }
+        // For any completed transactions we don't know if the commit actually made it to the broker
+        // or was lost along the way, so they need to be rolled back.
+        for (; iter != transactionStates.end(); ++iter) {
+            Pointer<Command> lastCommand = (*iter)->getCommands().getLast();
+            if (lastCommand->isTransactionInfo()) {
+                Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
+                if (transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE) {
+                    toRollback.push_back(transactionInfo);
+                    continue;
                 }
             }
 
             // replay short lived producers that may have been involved in the transaction
-            std::vector< Pointer<ProducerState> > producerStates = (*iter)->getProducerStates();
-            std::vector< Pointer<ProducerState> >::const_iterator state = producerStates.begin();
+            std::vector<Pointer<ProducerState> > producerStates = (*iter)->getProducerStates();
+            std::vector<Pointer<ProducerState> >::const_iterator state = producerStates.begin();
 
-            for( ; state != producerStates.end(); ++state ) {
-                transport->oneway( (*state)->getInfo() );
+            for (; state != producerStates.end(); ++state) {
+                transport->oneway((*state)->getInfo());
             }
 
-            std::auto_ptr< Iterator< Pointer<Command> > > commands(
-                (*iter)->getCommands().iterator() );
+            std::auto_ptr<Iterator<Pointer<Command> > > commands((*iter)->getCommands().iterator());
 
-            while( commands->hasNext() ) {
-                transport->oneway( commands->next() );
+            while (commands->hasNext()) {
+                transport->oneway(commands->next());
             }
 
             state = producerStates.begin();
-            for( ; state != producerStates.end(); ++state ) {
-                transport->oneway( (*state)->getInfo()->createRemoveCommand() );
+            for (; state != producerStates.end(); ++state) {
+                transport->oneway((*state)->getInfo()->createRemoveCommand());
             }
         }
 
-        std::vector< Pointer<Command> >::const_iterator command = toIgnore.begin();
-        for( ; command != toIgnore.end(); ++command ) {
-            // respond to the outstanding commit
-            Pointer<Response> response( new Response() );
-            response->setCorrelationId( (*command)->getCommandId() );
-            transport->getTransportListener()->onCommand( response );
+        // Trigger failure of commit for all outstanding completed but in doubt transactions.
+        std::vector<Pointer<TransactionInfo> >::const_iterator command = toRollback.begin();
+        for (; command != toRollback.end(); ++command) {
+            Pointer<ExceptionResponse> response(new ExceptionResponse());
+            Pointer<BrokerError> exception(new BrokerError());
+            exception->setExceptionClass("TransactionRolledBackException");
+            exception->setMessage(
+                std::string("Transaction completion in doubt due to failover. Forcing rollback of ") +
+                (*command)->getTransactionId()->toString());
+            response->setException(exception);
+            response->setCorrelationId((*command)->getCommandId());
+            transport->getTransportListener()->onCommand(response);
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -238,26 +242,25 @@ void ConnectionStateTracker::doRestoreTr
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionStateTracker::doRestoreSessions( const Pointer<transport::Transport>& transport,
-                                                const Pointer<ConnectionState>& connectionState ) {
+void ConnectionStateTracker::doRestoreSessions(Pointer<transport::Transport> transport,
+                                               Pointer<ConnectionState> connectionState) {
 
-    try{
-
-        std::vector< Pointer<SessionState> > sessionStates = connectionState->getSessionStates();
+    try {
 
-        std::vector< Pointer<SessionState> >::const_iterator iter = sessionStates.begin();
+        std::vector<Pointer<SessionState> > sessionStates = connectionState->getSessionStates();
+        std::vector<Pointer<SessionState> >::const_iterator iter = sessionStates.begin();
 
         // Restore the Session State
-        for( ; iter != sessionStates.end(); ++iter ) {
+        for (; iter != sessionStates.end(); ++iter) {
             Pointer<SessionState> state = *iter;
-            transport->oneway( state->getInfo() );
+            transport->oneway(state->getInfo());
 
-            if( restoreProducers ) {
-                doRestoreProducers( transport, state );
+            if (restoreProducers) {
+                doRestoreProducers(transport, state);
             }
 
-            if( restoreConsumers ) {
-                doRestoreConsumers( transport, state );
+            if (restoreConsumers) {
+                doRestoreConsumers(transport, state);
             }
         }
     }
@@ -267,33 +270,34 @@ void ConnectionStateTracker::doRestoreSe
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionStateTracker::doRestoreConsumers( const Pointer<transport::Transport>& transport,
-                                                 const Pointer<SessionState>& sessionState ) {
+void ConnectionStateTracker::doRestoreConsumers(Pointer<transport::Transport> transport, Pointer<SessionState> sessionState ) {
 
-    try{
+    try {
 
         // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
         Pointer<ConnectionState> connectionState =
-            connectionStates.get( sessionState->getInfo()->getSessionId()->getParentId() );
+            connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId());
         bool connectionInterruptionProcessingComplete =
             connectionState->isConnectionInterruptProcessingComplete();
 
-        std::vector< Pointer<ConsumerState> > consumerStates = sessionState->getConsumerStates();
-        std::vector< Pointer<ConsumerState> >::const_iterator state = consumerStates.begin();
+        std::vector<Pointer<ConsumerState> > consumerStates = sessionState->getConsumerStates();
+        std::vector<Pointer<ConsumerState> >::const_iterator state = consumerStates.begin();
 
-        for( ; state != consumerStates.end(); ++state ) {
+        for (; state != consumerStates.end(); ++state) {
 
             Pointer<ConsumerInfo> infoToSend = (*state)->getInfo();
             Pointer<wireformat::WireFormat> wireFormat = transport->getWireFormat();
 
-            if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0 && wireFormat->getVersion() > 5) {
+            if (!connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0 &&
+                wireFormat->getVersion() > 5) {
 
-                infoToSend.reset( (*state)->getInfo()->cloneDataStructure() );
-                connectionState->getRecoveringPullConsumers().put( infoToSend->getConsumerId(), (*state)->getInfo() );
+                infoToSend.reset((*state)->getInfo()->cloneDataStructure());
+                connectionState->getRecoveringPullConsumers().put(
+                    infoToSend->getConsumerId(), (*state)->getInfo());
                 infoToSend->setPrefetchSize(0);
             }
 
-            transport->oneway( infoToSend );
+            transport->oneway(infoToSend);
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -302,19 +306,17 @@ void ConnectionStateTracker::doRestoreCo
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionStateTracker::doRestoreProducers( const Pointer<transport::Transport>& transport,
-                                                 const Pointer<SessionState>& sessionState ) {
+void ConnectionStateTracker::doRestoreProducers(Pointer<transport::Transport> transport, Pointer<SessionState> sessionState ) {
 
-    try{
+    try {
 
         // Restore the session's producers
-        std::vector< Pointer<ProducerState> > producerStates = sessionState->getProducerStates();
+        std::vector<Pointer<ProducerState> > producerStates = sessionState->getProducerStates();
+        std::vector<Pointer<ProducerState> >::const_iterator iter = producerStates.begin();
 
-        std::vector< Pointer<ProducerState> >::const_iterator iter = producerStates.begin();
-
-        for( ; iter != producerStates.end(); ++iter ) {
+        for (; iter != producerStates.end(); ++iter) {
             Pointer<ProducerState> state = *iter;
-            transport->oneway( state->getInfo() );
+            transport->oneway(state->getInfo());
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -323,16 +325,14 @@ void ConnectionStateTracker::doRestorePr
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionStateTracker::doRestoreTempDestinations( const Pointer<transport::Transport>& transport,
-                                                        const Pointer<ConnectionState>& connectionState ) {
-
-    try{
+void ConnectionStateTracker::doRestoreTempDestinations(Pointer<transport::Transport> transport,
+                                                       Pointer<ConnectionState> connectionState ) {
+    try {
+        std::auto_ptr<Iterator<Pointer<DestinationInfo> > > iter(
+            connectionState->getTempDesinations().iterator());
 
-        std::auto_ptr< Iterator< Pointer<DestinationInfo> > > iter(
-            connectionState->getTempDesinations().iterator() );
-
-        while( iter->hasNext() ) {
-            transport->oneway( iter->next() );
+        while (iter->hasNext()) {
+            transport->oneway(iter->next());
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -341,13 +341,13 @@ void ConnectionStateTracker::doRestoreTe
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processDestinationInfo( DestinationInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processDestinationInfo(DestinationInfo* info) {
 
-    try{
-        if( info != NULL ) {
-            Pointer<ConnectionState> cs = connectionStates.get( info->getConnectionId() );
-            if( cs != NULL && info->getDestination()->isTemporary() ) {
-                cs->addTempDestination( Pointer<DestinationInfo>( info->cloneDataStructure() ) );
+    try {
+        if (info != NULL) {
+            Pointer<ConnectionState> cs = connectionStates.get(info->getConnectionId());
+            if (cs != NULL && info->getDestination()->isTemporary()) {
+                cs->addTempDestination(Pointer<DestinationInfo>(info->cloneDataStructure()));
             }
         }
         return TRACKED_RESPONSE_MARKER;
@@ -358,13 +358,13 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processRemoveDestination( DestinationInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processRemoveDestination(DestinationInfo* info) {
 
-    try{
-        if( info != NULL ) {
-            Pointer<ConnectionState> cs = connectionStates.get( info->getConnectionId() );
-            if( cs != NULL && info->getDestination()->isTemporary() ) {
-                cs->removeTempDestination( info->getDestination() );
+    try {
+        if (info != NULL) {
+            Pointer<ConnectionState> cs = connectionStates.get(info->getConnectionId());
+            if (cs != NULL && info->getDestination()->isTemporary()) {
+                cs->removeTempDestination(info->getDestination());
             }
         }
         return TRACKED_RESPONSE_MARKER;
@@ -375,21 +375,19 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processProducerInfo( ProducerInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processProducerInfo(ProducerInfo* info) {
 
-    try{
-
-        if( info != NULL && info->getProducerId() != NULL ) {
+    try {
+        if (info != NULL && info->getProducerId() != NULL) {
             Pointer<SessionId> sessionId = info->getProducerId()->getParentId();
-            if( sessionId != NULL ) {
+            if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
-                if( connectionId != NULL ) {
-                    Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                    if( cs != NULL ) {
-                        Pointer<SessionState> ss = cs->getSessionState( sessionId );
-                        if( ss != NULL ) {
-                            ss->addProducer(
-                                Pointer<ProducerInfo>( info->cloneDataStructure() ) );
+                if (connectionId != NULL) {
+                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    if (cs != NULL) {
+                        Pointer<SessionState> ss = cs->getSessionState(sessionId);
+                        if (ss != NULL) {
+                            ss->addProducer(Pointer<ProducerInfo>(info->cloneDataStructure()));
                         }
                     }
                 }
@@ -403,20 +401,19 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processRemoveProducer( ProducerId* id ) {
-
-    try{
+Pointer<Command> ConnectionStateTracker::processRemoveProducer(ProducerId* id) {
 
-        if( id != NULL ) {
+    try {
+        if (id != NULL) {
             Pointer<SessionId> sessionId = id->getParentId();
-            if( sessionId != NULL ) {
+            if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
-                if( connectionId != NULL ) {
-                    Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                    if( cs != NULL ) {
-                        Pointer<SessionState> ss = cs->getSessionState( sessionId );
-                        if( ss != NULL ) {
-                            ss->removeProducer( Pointer<ProducerId>( id->cloneDataStructure() ) );
+                if (connectionId != NULL) {
+                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    if (cs != NULL) {
+                        Pointer<SessionState> ss = cs->getSessionState(sessionId);
+                        if (ss != NULL) {
+                            ss->removeProducer(Pointer<ProducerId>(id->cloneDataStructure()));
                         }
                     }
                 }
@@ -430,21 +427,20 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processConsumerInfo( ConsumerInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processConsumerInfo(ConsumerInfo* info) {
 
-    try{
+    try {
 
-        if( info != NULL ) {
+        if (info != NULL) {
             Pointer<SessionId> sessionId = info->getConsumerId()->getParentId();
-            if( sessionId != NULL ) {
+            if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
-                if( connectionId != NULL ) {
-                    Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                    if( cs != NULL ) {
-                        Pointer<SessionState> ss = cs->getSessionState( sessionId );
-                        if( ss != NULL ) {
-                            ss->addConsumer(
-                                Pointer<ConsumerInfo>( info->cloneDataStructure() ) );
+                if (connectionId != NULL) {
+                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    if (cs != NULL) {
+                        Pointer<SessionState> ss = cs->getSessionState(sessionId);
+                        if (ss != NULL) {
+                            ss->addConsumer(Pointer<ConsumerInfo>(info->cloneDataStructure()));
                         }
                     }
                 }
@@ -458,19 +454,19 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processRemoveConsumer( ConsumerId* id ) {
+Pointer<Command> ConnectionStateTracker::processRemoveConsumer(ConsumerId* id) {
 
-    try{
-        if( id != NULL ) {
+    try {
+        if (id != NULL) {
             Pointer<SessionId> sessionId = id->getParentId();
-            if( sessionId != NULL ) {
+            if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
-                if( connectionId != NULL ) {
-                    Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                    if( cs != NULL ) {
-                        Pointer<SessionState> ss = cs->getSessionState( sessionId );
-                        if( ss != NULL ) {
-                            ss->removeConsumer( Pointer<ConsumerId>( id->cloneDataStructure() ) );
+                if (connectionId != NULL) {
+                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    if (cs != NULL) {
+                        Pointer<SessionState> ss = cs->getSessionState(sessionId);
+                        if (ss != NULL) {
+                            ss->removeConsumer(Pointer<ConsumerId>(id->cloneDataStructure()));
                         }
                     }
                 }
@@ -484,16 +480,16 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processSessionInfo( SessionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processSessionInfo(SessionInfo* info) {
 
-    try{
+    try {
 
-        if( info != NULL ) {
+        if (info != NULL) {
             Pointer<ConnectionId> connectionId = info->getSessionId()->getParentId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
-                    cs->addSession( Pointer<SessionInfo>( info->cloneDataStructure() ) );
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
+                    cs->addSession(Pointer<SessionInfo>(info->cloneDataStructure()));
                 }
             }
         }
@@ -505,16 +501,16 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processRemoveSession( SessionId* id ) {
+Pointer<Command> ConnectionStateTracker::processRemoveSession(SessionId* id) {
 
-    try{
+    try {
 
-        if( id != NULL ) {
+        if (id != NULL) {
             Pointer<ConnectionId> connectionId = id->getParentId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
-                    cs->removeSession( Pointer<SessionId>( id->cloneDataStructure() ) );
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
+                    cs->removeSession(Pointer<SessionId>(id->cloneDataStructure()));
                 }
             }
         }
@@ -526,14 +522,13 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processConnectionInfo( ConnectionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processConnectionInfo(ConnectionInfo* info) {
 
-    try{
-
-        if( info != NULL ) {
-            Pointer<ConnectionInfo> infoCopy( info->cloneDataStructure() );
-            connectionStates.put( info->getConnectionId(),
-                                  Pointer<ConnectionState>( new ConnectionState( infoCopy ) ) );
+    try {
+        if (info != NULL) {
+            Pointer<ConnectionInfo> infoCopy(info->cloneDataStructure());
+            connectionStates.put(
+                info->getConnectionId(), Pointer<ConnectionState>(new ConnectionState(infoCopy)));
         }
         return TRACKED_RESPONSE_MARKER;
     }
@@ -543,12 +538,11 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processRemoveConnection( ConnectionId* id ) {
+Pointer<Command> ConnectionStateTracker::processRemoveConnection(ConnectionId* id) {
 
-    try{
-
-        if( id != NULL ) {
-            connectionStates.remove( Pointer<ConnectionId>( id->cloneDataStructure() ) );
+    try {
+        if (id != NULL) {
+            connectionStates.remove(Pointer<ConnectionId>(id->cloneDataStructure()));
         }
 
         return TRACKED_RESPONSE_MARKER;
@@ -559,27 +553,26 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processMessage( Message* message ) {
+Pointer<Command> ConnectionStateTracker::processMessage(Message* message) {
 
-    try{
+    try {
 
-        if( message != NULL ) {
-            if( trackTransactions && message->getTransactionId() != NULL ) {
+        if (message != NULL) {
+            if (trackTransactions && message->getTransactionId() != NULL) {
                 Pointer<ProducerId> producerId = message->getProducerId();
                 Pointer<ConnectionId> connectionId = producerId->getParentId()->getParentId();
 
-                if( connectionId != NULL ) {
-                    Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                    if( cs != NULL ) {
+                if (connectionId != NULL) {
+                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    if (cs != NULL) {
                         Pointer<TransactionState> transactionState =
-                            cs->getTransactionState( message->getTransactionId() );
-                        if( transactionState != NULL ) {
-                            transactionState->addCommand(
-                                Pointer<Command>( message->cloneDataStructure() ) );
+                            cs->getTransactionState(message->getTransactionId());
+                        if (transactionState != NULL) {
+                            transactionState->addCommand(Pointer<Command>(message->cloneDataStructure()));
 
-                            if( trackTransactionProducers ) {
+                            if (trackTransactionProducers) {
                                 // Track the producer in case it is closed before a commit
-                                Pointer<SessionState> sessionState = cs->getSessionState( producerId->getParentId() );
+                                Pointer<SessionState> sessionState = cs->getSessionState(producerId->getParentId());
                                 Pointer<ProducerState> producerState = sessionState->getProducerState(producerId);
                                 producerState->setTransactionState(transactionState);
                             }
@@ -587,39 +580,11 @@ Pointer<Command> ConnectionStateTracker:
                     }
                 }
                 return TRACKED_RESPONSE_MARKER;
-            }else if( trackMessages ) {
-                messageCache.put( message->getMessageId(),
-                                  Pointer<Message>( message->cloneDataStructure() ) );
+            } else if (trackMessages) {
+                messageCache.put(message->getMessageId(), Pointer<Message> (message->cloneDataStructure()));
             }
         }
-        return Pointer<Response>();
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processMessageAck( MessageAck* ack ) {
 
-    try{
-
-        if( trackTransactions && ack != NULL && ack->getTransactionId() != NULL) {
-            Pointer<ConnectionId> connectionId;// =
-                ack->getConsumerId()->getParentId()->getParentId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
-                    Pointer<TransactionState> transactionState =
-                        cs->getTransactionState( ack->getTransactionId() );
-                    if( transactionState != NULL ) {
-                        transactionState->addCommand(
-                            Pointer<Command>( ack->cloneDataStructure() ) );
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
         return Pointer<Response>();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -628,27 +593,26 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processBeginTransaction( TransactionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processBeginTransaction(TransactionInfo* info) {
 
-    try{
+    try {
 
-        if( trackTransactions && info != NULL ) {
+        if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
-                    cs->addTransactionState( info->getTransactionId() );
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
+                    cs->addTransactionState(info->getTransactionId());
                     Pointer<TransactionState> transactionState =
-                        cs->getTransactionState( info->getTransactionId() );
-                    transactionState->addCommand(
-                        Pointer<Command>( info->cloneDataStructure() ) );
+                        cs->getTransactionState(info->getTransactionId());
+                    transactionState->addCommand(Pointer<Command>(info->cloneDataStructure()));
                 }
             }
 
             return TRACKED_RESPONSE_MARKER;
         }
 
-        return Pointer<Response>();
+        return Pointer<Response> ();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -656,20 +620,19 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processPrepareTransaction( TransactionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processPrepareTransaction(TransactionInfo* info) {
 
-    try{
+    try {
 
-        if( trackTransactions && info != NULL ) {
+        if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
                     Pointer<TransactionState> transactionState =
-                        cs->getTransactionState( info->getTransactionId() );
-                    if( transactionState != NULL ) {
-                        transactionState->addCommand(
-                            Pointer<Command>( info->cloneDataStructure() ) );
+                        cs->getTransactionState(info->getTransactionId());
+                    if (transactionState != NULL) {
+                        transactionState->addCommand(Pointer<Command>(info->cloneDataStructure()));
                     }
                 }
             }
@@ -685,29 +648,28 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processCommitTransactionOnePhase( TransactionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processCommitTransactionOnePhase(TransactionInfo* info) {
 
-    try{
+    try {
 
-        if( trackTransactions && info != NULL ) {
+        if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
                     Pointer<TransactionState> transactionState =
-                        cs->getTransactionState( info->getTransactionId() );
-                    if( transactionState != NULL ) {
-                        Pointer<TransactionInfo> infoCopy =
-                            Pointer<TransactionInfo>( info->cloneDataStructure() );
-                        transactionState->addCommand( infoCopy );
-                        return Pointer<Tracked>( new Tracked(
-                            Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy ) ) ) );
+                        cs->getTransactionState(info->getTransactionId());
+                    if (transactionState != NULL) {
+                        Pointer<TransactionInfo> infoCopy(info->cloneDataStructure());
+                        transactionState->addCommand(infoCopy);
+                        return Pointer<Tracked>(
+                            new Tracked(Pointer<Runnable>(new RemoveTransactionAction(this, infoCopy))));
                     }
                 }
             }
         }
 
-        return Pointer<Response>();
+        return Pointer<Response> ();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -715,23 +677,22 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processCommitTransactionTwoPhase( TransactionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processCommitTransactionTwoPhase(TransactionInfo* info) {
 
-    try{
+    try {
 
-        if( trackTransactions && info != NULL ) {
+        if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
                     Pointer<TransactionState> transactionState =
-                        cs->getTransactionState( info->getTransactionId() );
-                    if( transactionState != NULL ) {
-                        Pointer<TransactionInfo> infoCopy =
-                            Pointer<TransactionInfo>( info->cloneDataStructure() );
-                        transactionState->addCommand( infoCopy );
-                        return Pointer<Tracked>( new Tracked(
-                            Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy ) ) ) );
+                        cs->getTransactionState(info->getTransactionId());
+                    if (transactionState != NULL) {
+                        Pointer<TransactionInfo> infoCopy(info->cloneDataStructure());
+                        transactionState->addCommand(infoCopy);
+                        return Pointer<Tracked>(
+                            new Tracked(Pointer<Runnable>(new RemoveTransactionAction(this, infoCopy))));
                     }
                 }
             }
@@ -745,23 +706,22 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processRollbackTransaction( TransactionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processRollbackTransaction(TransactionInfo* info) {
 
-    try{
+    try {
 
-        if( trackTransactions && info != NULL ) {
+        if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
                     Pointer<TransactionState> transactionState =
-                        cs->getTransactionState( info->getTransactionId() );
-                    if( transactionState != NULL ) {
-                        Pointer<TransactionInfo> infoCopy =
-                            Pointer<TransactionInfo>( info->cloneDataStructure() );
-                        transactionState->addCommand( infoCopy );
-                        return Pointer<Tracked>( new Tracked(
-                            Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy ) ) ) );
+                        cs->getTransactionState(info->getTransactionId());
+                    if (transactionState != NULL) {
+                        Pointer<TransactionInfo> infoCopy(info->cloneDataStructure());
+                        transactionState->addCommand(infoCopy);
+                        return Pointer<Tracked>(
+                            new Tracked(Pointer<Runnable>(new RemoveTransactionAction(this, infoCopy))));
                     }
                 }
             }
@@ -775,20 +735,19 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processEndTransaction( TransactionInfo* info ) {
+Pointer<Command> ConnectionStateTracker::processEndTransaction(TransactionInfo* info) {
 
-    try{
+    try {
 
-        if( trackTransactions && info != NULL ) {
+        if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            if( connectionId != NULL ) {
-                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
-                if( cs != NULL ) {
+            if (connectionId != NULL) {
+                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                if (cs != NULL) {
                     Pointer<TransactionState> transactionState =
-                        cs->getTransactionState( info->getTransactionId() );
-                    if( transactionState != NULL ) {
-                        transactionState->addCommand(
-                            Pointer<Command>( info->cloneDataStructure() ) );
+                        cs->getTransactionState(info->getTransactionId());
+                    if (transactionState != NULL) {
+                        transactionState->addCommand(Pointer<Command> (info->cloneDataStructure()));
                     }
                 }
             }
@@ -806,14 +765,14 @@ Pointer<Command> ConnectionStateTracker:
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<Command> ConnectionStateTracker::processMessagePull(MessagePull* pull) {
 
-    try{
+    try {
 
         if (pull != NULL && pull->getDestination() != NULL && pull->getConsumerId() != NULL) {
             std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString();
             messagePullCache.put(id, Pointer<Command>(pull->cloneDataStructure()));
         }
 
-        return Pointer<Command>();
+        return Pointer<Command> ();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -822,30 +781,30 @@ Pointer<Command> ConnectionStateTracker:
 
 ////////////////////////////////////////////////////////////////////////////////
 void ConnectionStateTracker::connectionInterruptProcessingComplete(
-    transport::Transport* transport, const Pointer<ConnectionId>& connectionId) {
+    transport::Transport* transport, Pointer<ConnectionId> connectionId) {
 
-    Pointer<ConnectionState> connectionState = connectionStates.get( connectionId );
+    Pointer<ConnectionState> connectionState = connectionStates.get(connectionId);
 
-    if( connectionState != NULL ) {
+    if (connectionState != NULL) {
 
-        connectionState->setConnectionInterruptProcessingComplete( true );
+        connectionState->setConnectionInterruptProcessingComplete(true);
 
-        StlMap< Pointer<ConsumerId>, Pointer<ConsumerInfo>, ConsumerId::COMPARATOR > stalledConsumers =
+        StlMap<Pointer<ConsumerId>, Pointer<ConsumerInfo>, ConsumerId::COMPARATOR> stalledConsumers =
             connectionState->getRecoveringPullConsumers();
 
-        std::vector< Pointer<ConsumerId> > keySet = stalledConsumers.keySet();
-        std::vector< Pointer<ConsumerId> >::const_iterator key = keySet.begin();
+        std::vector<Pointer<ConsumerId> > keySet = stalledConsumers.keySet();
+        std::vector<Pointer<ConsumerId> >::const_iterator key = keySet.begin();
 
-        for( ; key != keySet.end(); ++key ) {
-            Pointer<ConsumerControl> control( new ConsumerControl() );
+        for (; key != keySet.end(); ++key) {
+            Pointer<ConsumerControl> control(new ConsumerControl());
 
-            control->setConsumerId( *key );
-            control->setPrefetch( stalledConsumers.get( *key )->getPrefetchSize() );
-            control->setDestination( stalledConsumers.get( *key )->getDestination() );
+            control->setConsumerId(*key);
+            control->setPrefetch(stalledConsumers.get(*key)->getPrefetchSize());
+            control->setDestination(stalledConsumers.get(*key)->getDestination());
 
             try {
-                transport->oneway( control );
-            } catch( Exception& ex ) {
+                transport->oneway(control);
+            } catch (Exception& ex) {
             }
         }
 
@@ -856,10 +815,10 @@ void ConnectionStateTracker::connectionI
 ////////////////////////////////////////////////////////////////////////////////
 void ConnectionStateTracker::transportInterrupted() {
 
-    std::vector< Pointer<ConnectionState> > connectionStatesVec = this->connectionStates.values();
-    std::vector< Pointer<ConnectionState> >::const_iterator state = connectionStatesVec.begin();
+    std::vector<Pointer<ConnectionState> > connectionStatesVec = this->connectionStates.values();
+    std::vector<Pointer<ConnectionState> >::const_iterator state = connectionStatesVec.begin();
 
-    for( ; state != connectionStatesVec.end(); ++state ) {
-        (*state)->setConnectionInterruptProcessingComplete( false );
+    for (; state != connectionStatesVec.end(); ++state) {
+        (*state)->setConnectionInterruptProcessingComplete(false);
     }
 }



Mime
View raw message