Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B943BCF06 for ; Wed, 16 May 2012 16:02:32 +0000 (UTC) Received: (qmail 13183 invoked by uid 500); 16 May 2012 16:02:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13143 invoked by uid 500); 16 May 2012 16:02:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13134 invoked by uid 99); 16 May 2012 16:02:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 May 2012 16:02:32 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 May 2012 16:02:27 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A85252388847 for ; Wed, 16 May 2012 16:02:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1339242 [1/2] - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ Date: Wed, 16 May 2012 16:02:06 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120516160207.A85252388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Wed May 16 16:02:06 2012 New Revision: 1339242 URL: http://svn.apache.org/viewvc?rev=1339242&view=rev Log: Refactor and clean up the State tracker a bit. Also has a fix for: https://issues.apache.org/jira/browse/AMQCPP-402 Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConsumerState.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConsumerState.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/Tracked.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/Tracked.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h?rev=1339242&r1=1339241&r2=1339242&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitor.h Wed May 16 16:02:06 2012 @@ -69,124 +69,86 @@ namespace state { class AMQCPP_API CommandVisitor { public: - virtual ~CommandVisitor() {} + virtual ~CommandVisitor() { + } - virtual decaf::lang::Pointer processTransactionInfo( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processTransactionInfo(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processRemoveInfo( - commands::RemoveInfo* info ) = 0; + virtual decaf::lang::Pointer processRemoveInfo(commands::RemoveInfo* info) = 0; - virtual decaf::lang::Pointer processConnectionInfo( - commands::ConnectionInfo* info ) = 0; + virtual decaf::lang::Pointer processConnectionInfo(commands::ConnectionInfo* info) = 0; - virtual decaf::lang::Pointer processSessionInfo( - commands::SessionInfo* info ) = 0; + virtual decaf::lang::Pointer processSessionInfo(commands::SessionInfo* info) = 0; - virtual decaf::lang::Pointer processProducerInfo( - commands::ProducerInfo* info ) = 0; + virtual decaf::lang::Pointer processProducerInfo(commands::ProducerInfo* info) = 0; - virtual decaf::lang::Pointer processConsumerInfo( - commands::ConsumerInfo* info ) = 0; + virtual decaf::lang::Pointer processConsumerInfo(commands::ConsumerInfo* info) = 0; - virtual decaf::lang::Pointer processRemoveConnection( - commands::ConnectionId* id ) = 0; + virtual decaf::lang::Pointer processRemoveConnection(commands::ConnectionId* id) = 0; - virtual decaf::lang::Pointer processRemoveSession( - commands::SessionId* id ) = 0; + virtual decaf::lang::Pointer processRemoveSession(commands::SessionId* id) = 0; - virtual decaf::lang::Pointer processRemoveProducer( - commands::ProducerId* id ) = 0; + virtual decaf::lang::Pointer processRemoveProducer(commands::ProducerId* id) = 0; - virtual decaf::lang::Pointer processRemoveConsumer( - commands::ConsumerId* id ) = 0; + virtual decaf::lang::Pointer processRemoveConsumer(commands::ConsumerId* id) = 0; - virtual decaf::lang::Pointer processDestinationInfo( - commands::DestinationInfo* info ) = 0; + virtual decaf::lang::Pointer processDestinationInfo(commands::DestinationInfo* info) = 0; - virtual decaf::lang::Pointer processRemoveDestination( - commands::DestinationInfo* info ) = 0; + virtual decaf::lang::Pointer processRemoveDestination(commands::DestinationInfo* info) = 0; - virtual decaf::lang::Pointer processRemoveSubscriptionInfo( - commands::RemoveSubscriptionInfo* info ) = 0; + virtual decaf::lang::Pointer processRemoveSubscriptionInfo(commands::RemoveSubscriptionInfo* info) = 0; - virtual decaf::lang::Pointer processMessage( - commands::Message* send ) = 0; + virtual decaf::lang::Pointer processMessage(commands::Message* send) = 0; - virtual decaf::lang::Pointer processMessageAck( - commands::MessageAck* ack ) = 0; + virtual decaf::lang::Pointer processMessageAck(commands::MessageAck* ack) = 0; - virtual decaf::lang::Pointer processMessagePull( - commands::MessagePull* pull ) = 0; + virtual decaf::lang::Pointer processMessagePull(commands::MessagePull* pull) = 0; - virtual decaf::lang::Pointer processBeginTransaction( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processBeginTransaction(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processPrepareTransaction( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processPrepareTransaction(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processCommitTransactionOnePhase( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processCommitTransactionOnePhase(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processCommitTransactionTwoPhase( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processCommitTransactionTwoPhase(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processRollbackTransaction( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processRollbackTransaction(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processWireFormat( - commands::WireFormatInfo* info ) = 0; + virtual decaf::lang::Pointer processWireFormat(commands::WireFormatInfo* info) = 0; - virtual decaf::lang::Pointer processKeepAliveInfo( - commands::KeepAliveInfo* info ) = 0; + virtual decaf::lang::Pointer processKeepAliveInfo(commands::KeepAliveInfo* info) = 0; - virtual decaf::lang::Pointer processShutdownInfo( - commands::ShutdownInfo* info ) = 0; + virtual decaf::lang::Pointer processShutdownInfo(commands::ShutdownInfo* info) = 0; - virtual decaf::lang::Pointer processFlushCommand( - commands::FlushCommand* command ) = 0; + virtual decaf::lang::Pointer processFlushCommand(commands::FlushCommand* command) = 0; - virtual decaf::lang::Pointer processBrokerInfo( - commands::BrokerInfo* info) = 0; + virtual decaf::lang::Pointer processBrokerInfo(commands::BrokerInfo* info) = 0; - virtual decaf::lang::Pointer processRecoverTransactions( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processRecoverTransactions(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processForgetTransaction( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processForgetTransaction(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processEndTransaction( - commands::TransactionInfo* info ) = 0; + virtual decaf::lang::Pointer processEndTransaction(commands::TransactionInfo* info) = 0; - virtual decaf::lang::Pointer processMessageDispatchNotification( - commands::MessageDispatchNotification* notification ) = 0; + virtual decaf::lang::Pointer processMessageDispatchNotification(commands::MessageDispatchNotification* notification) = 0; - virtual decaf::lang::Pointer processProducerAck( - commands::ProducerAck* ack ) = 0; + virtual decaf::lang::Pointer processProducerAck(commands::ProducerAck* ack) = 0; - virtual decaf::lang::Pointer processMessageDispatch( - commands::MessageDispatch* dispatch ) = 0; + virtual decaf::lang::Pointer processMessageDispatch(commands::MessageDispatch* dispatch) = 0; - virtual decaf::lang::Pointer processControlCommand( - commands::ControlCommand* command ) = 0; + virtual decaf::lang::Pointer processControlCommand(commands::ControlCommand* command) = 0; - virtual decaf::lang::Pointer processConnectionError( - commands::ConnectionError* error ) = 0; + virtual decaf::lang::Pointer processConnectionError(commands::ConnectionError* error) = 0; - virtual decaf::lang::Pointer processConnectionControl( - commands::ConnectionControl* control ) = 0; + virtual decaf::lang::Pointer processConnectionControl(commands::ConnectionControl* control) = 0; - virtual decaf::lang::Pointer processConsumerControl( - commands::ConsumerControl* control ) = 0; + virtual decaf::lang::Pointer processConsumerControl(commands::ConsumerControl* control) = 0; - virtual decaf::lang::Pointer processBrokerError( - commands::BrokerError* error ) = 0; + virtual decaf::lang::Pointer processBrokerError(commands::BrokerError* error) = 0; - virtual decaf::lang::Pointer processReplayCommand( - commands::ReplayCommand* replay ) = 0; + virtual decaf::lang::Pointer processReplayCommand(commands::ReplayCommand* replay) = 0; - virtual decaf::lang::Pointer processResponse( - commands::Response* response ) = 0; + virtual decaf::lang::Pointer processResponse(commands::Response* response) = 0; }; Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h?rev=1339242&r1=1339241&r2=1339242&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/CommandVisitorAdapter.h Wed May 16 16:02:06 2012 @@ -63,287 +63,213 @@ namespace state { * * @since 3.0 */ - class AMQCPP_API CommandVisitorAdapter : public CommandVisitor { + class AMQCPP_API CommandVisitorAdapter: public CommandVisitor { public: - virtual ~CommandVisitorAdapter() {} - - virtual decaf::lang::Pointer processRemoveConnection( - commands::ConnectionId* id AMQCPP_UNUSED ) { + virtual ~CommandVisitorAdapter() { + } + virtual decaf::lang::Pointer processRemoveConnection(commands::ConnectionId* id AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRemoveSession( - commands::SessionId* id AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processRemoveSession(commands::SessionId* id AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRemoveProducer( - commands::ProducerId* id AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processRemoveProducer(commands::ProducerId* id AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRemoveConsumer( - commands::ConsumerId* id AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processRemoveConsumer(commands::ConsumerId* id AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processDestinationInfo( - commands::DestinationInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processDestinationInfo(commands::DestinationInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRemoveDestination( - commands::DestinationInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processRemoveDestination(commands::DestinationInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRemoveSubscriptionInfo( - commands::RemoveSubscriptionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processRemoveSubscriptionInfo(commands::RemoveSubscriptionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processMessage( - commands::Message* send AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processMessage(commands::Message* send AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processMessageAck( - commands::MessageAck* ack AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processMessageAck(commands::MessageAck* ack AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processMessagePull( - commands::MessagePull* pull AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processMessagePull(commands::MessagePull* pull AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processBeginTransaction( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processBeginTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processPrepareTransaction( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processPrepareTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processCommitTransactionOnePhase( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processCommitTransactionOnePhase(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processCommitTransactionTwoPhase( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processCommitTransactionTwoPhase(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRollbackTransaction( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processRollbackTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processWireFormat( - commands::WireFormatInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processWireFormat(commands::WireFormatInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processKeepAliveInfo( - commands::KeepAliveInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processKeepAliveInfo(commands::KeepAliveInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processShutdownInfo( - commands::ShutdownInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processShutdownInfo(commands::ShutdownInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processFlushCommand( - commands::FlushCommand* command AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processFlushCommand(commands::FlushCommand* command AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processBrokerInfo( - commands::BrokerInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processBrokerInfo(commands::BrokerInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRecoverTransactions( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processRecoverTransactions(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processForgetTransaction( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processForgetTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processEndTransaction( - commands::TransactionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processEndTransaction(commands::TransactionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processMessageDispatchNotification( - commands::MessageDispatchNotification* notification AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processMessageDispatchNotification(commands::MessageDispatchNotification* notification AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processProducerAck( - commands::ProducerAck* ack AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processProducerAck(commands::ProducerAck* ack AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processMessageDispatch( - commands::MessageDispatch* dispatch AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processMessageDispatch(commands::MessageDispatch* dispatch AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processControlCommand( - commands::ControlCommand* command AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processControlCommand(commands::ControlCommand* command AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processConnectionError( - commands::ConnectionError* error AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processConnectionError(commands::ConnectionError* error AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processConnectionControl( - commands::ConnectionControl* control AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processConnectionControl(commands::ConnectionControl* control AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processConsumerControl( - commands::ConsumerControl* control AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processConsumerControl(commands::ConsumerControl* control AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processBrokerError( - commands::BrokerError* error AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processBrokerError(commands::BrokerError* error AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processReplayCommand( - commands::ReplayCommand* replay AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processReplayCommand(commands::ReplayCommand* replay AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processResponse( - commands::Response* response AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processResponse(commands::Response* response AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processConnectionInfo( - commands::ConnectionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processConnectionInfo(commands::ConnectionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processSessionInfo( - commands::SessionInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processSessionInfo(commands::SessionInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processProducerInfo( - commands::ProducerInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processProducerInfo(commands::ProducerInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processConsumerInfo( - commands::ConsumerInfo* info AMQCPP_UNUSED ) { - + virtual decaf::lang::Pointer processConsumerInfo(commands::ConsumerInfo* info AMQCPP_UNUSED ) { return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processTransactionInfo( - commands::TransactionInfo* info ) { + virtual decaf::lang::Pointer processTransactionInfo(commands::TransactionInfo* info) { - if( info != decaf::lang::Pointer() ) { - switch( info->getType() ) { + if (info != decaf::lang::Pointer()) { + switch (info->getType()) { case core::ActiveMQConstants::TRANSACTION_STATE_BEGIN: - return this->processBeginTransaction( info ); + return this->processBeginTransaction(info); case core::ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE: - return this->processCommitTransactionOnePhase( info ); + return this->processCommitTransactionOnePhase(info); case core::ActiveMQConstants::TRANSACTION_STATE_COMMITTWOPHASE: - return this->processCommitTransactionTwoPhase( info ); + return this->processCommitTransactionTwoPhase(info); case core::ActiveMQConstants::TRANSACTION_STATE_END: - return this->processEndTransaction( info ); + return this->processEndTransaction(info); case core::ActiveMQConstants::TRANSACTION_STATE_FORGET: - return this->processForgetTransaction( info ); + return this->processForgetTransaction(info); case core::ActiveMQConstants::TRANSACTION_STATE_PREPARE: - return this->processPrepareTransaction( info ); + return this->processPrepareTransaction(info); case core::ActiveMQConstants::TRANSACTION_STATE_RECOVER: - return this->processRecoverTransactions( info ); + return this->processRecoverTransactions(info); case core::ActiveMQConstants::TRANSACTION_STATE_ROLLBACK: - return this->processRollbackTransaction( info ); + return this->processRollbackTransaction(info); default: throw exceptions::ActiveMQException( __FILE__, __LINE__, "Unknown Transaction Info Type."); } } + return decaf::lang::Pointer(); } - virtual decaf::lang::Pointer processRemoveInfo( - commands::RemoveInfo* info ) { + virtual decaf::lang::Pointer processRemoveInfo(commands::RemoveInfo* info) { - if( info != decaf::lang::Pointer() ) { - switch( info->getObjectId()->getDataStructureType() ) { + if (info != decaf::lang::Pointer()) { + switch (info->getObjectId()->getDataStructureType()) { case commands::ConnectionId::ID_CONNECTIONID: return this->processRemoveConnection( - dynamic_cast( info->getObjectId().get() ) ); + dynamic_cast (info->getObjectId().get())); case commands::SessionId::ID_SESSIONID: return this->processRemoveSession( - dynamic_cast( info->getObjectId().get() ) ); + dynamic_cast (info->getObjectId().get())); case commands::ConsumerId::ID_CONSUMERID: return this->processRemoveConsumer( - dynamic_cast( info->getObjectId().get() ) ); + dynamic_cast (info->getObjectId().get())); case commands::ProducerId::ID_PRODUCERID: return this->processRemoveProducer( - dynamic_cast( info->getObjectId().get() ) ); + dynamic_cast (info->getObjectId().get())); default: throw exceptions::ActiveMQException( __FILE__, __LINE__, "Unknown Remove Info Type."); } } + return decaf::lang::Pointer(); } - }; }} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp?rev=1339242&r1=1339241&r2=1339242&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp Wed May 16 16:02:06 2012 @@ -27,7 +27,7 @@ using namespace activemq::state; using namespace activemq::commands; //////////////////////////////////////////////////////////////////////////////// -ConnectionState::ConnectionState( const Pointer& info ) : +ConnectionState::ConnectionState(Pointer info) : info(info), transactions(), sessions(), @@ -51,7 +51,7 @@ ConnectionState::~ConnectionState() { //////////////////////////////////////////////////////////////////////////////// std::string ConnectionState::toString() const { - if( this->info.get() != NULL ) { + if (this->info.get() != NULL) { return this->info->toString(); } @@ -59,24 +59,23 @@ std::string ConnectionState::toString() } //////////////////////////////////////////////////////////////////////////////// -void ConnectionState::reset( const Pointer& info ) { - +void ConnectionState::reset(Pointer info) { this->info = info; transactions.clear(); sessions.clear(); tempDestinations.clear(); - disposed.set( false ); + disposed.set(false); } //////////////////////////////////////////////////////////////////////////////// void ConnectionState::shutdown() { - if( this->disposed.compareAndSet( false, true ) ) { + if (this->disposed.compareAndSet(false, true)) { - std::vector< Pointer > values = this->sessions.values(); - std::vector< Pointer >::iterator iter = values.begin(); + std::vector > values = this->sessions.values(); + std::vector >::iterator iter = values.begin(); - for( ; iter != values.end(); ++iter ) { + for (; iter != values.end(); ++iter) { (*iter)->shutdown(); } } @@ -85,8 +84,8 @@ void ConnectionState::shutdown() { //////////////////////////////////////////////////////////////////////////////// void ConnectionState::checkShutdown() const { - if( this->disposed.get() ) { + if (this->disposed.get()) { throw decaf::lang::exceptions::IllegalStateException( - __FILE__, __LINE__, "Connection already Disposed" ); + __FILE__, __LINE__, "Connection already Disposed"); } } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h?rev=1339242&r1=1339241&r2=1339242&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h Wed May 16 16:02:06 2012 @@ -67,13 +67,13 @@ namespace state { public: - ConnectionState( const Pointer& info ); + ConnectionState(Pointer info); virtual ~ConnectionState(); std::string toString() const; - const Pointer& getInfo() const { + const Pointer getInfo() const { return this->info; } @@ -81,71 +81,68 @@ namespace state { void shutdown(); - void reset( const Pointer& info ); + void reset(Pointer info); - void addTempDestination( const Pointer& info ) { + void addTempDestination(Pointer info) { checkShutdown(); - tempDestinations.add( info ); + tempDestinations.add(info); } - void removeTempDestination( const Pointer& destination ) { + void removeTempDestination(Pointer destination) { - std::auto_ptr< decaf::util::Iterator< Pointer > > iter( - tempDestinations.iterator() ); + std::auto_ptr > > iter(tempDestinations.iterator()); - while( iter->hasNext() ) { + while (iter->hasNext()) { Pointer di = iter->next(); - if( di->getDestination()->equals( destination.get() ) ) { + if (di->getDestination()->equals(destination.get())) { iter->remove(); } } } - void addTransactionState( const Pointer& id ) { + void addTransactionState(Pointer id) { checkShutdown(); - transactions.put( id.dynamicCast(), - Pointer( new TransactionState( id ) ) ); + transactions.put(id.dynamicCast(), Pointer(new TransactionState(id))); } - const Pointer& getTransactionState( const Pointer& id ) const { - return transactions.get( id.dynamicCast() ); + const Pointer& getTransactionState(Pointer id) const { + return transactions.get(id.dynamicCast()); } - std::vector< Pointer > getTransactionStates() const { + std::vector > getTransactionStates() const { return transactions.values(); } - Pointer removeTransactionState( const Pointer& id ) { - return transactions.remove( id.dynamicCast() ); + Pointer removeTransactionState(Pointer id) { + return transactions.remove(id.dynamicCast()); } - void addSession( const Pointer& info ) { + void addSession(Pointer info) { checkShutdown(); - sessions.put( - info->getSessionId(), Pointer( new SessionState( info ) ) ); + sessions.put(info->getSessionId(), Pointer(new SessionState(info))); } - Pointer removeSession( const Pointer& id ) { - return sessions.remove( id ); + Pointer removeSession(Pointer id) { + return sessions.remove(id); } - const Pointer& getSessionState( const Pointer& id ) const { - return sessions.get( id ); + const Pointer getSessionState(Pointer id) const { + return sessions.get(id); } - const LinkedList< Pointer >& getTempDesinations() const { + const LinkedList >& getTempDesinations() const { return tempDestinations; } - std::vector< Pointer > getSessionStates() const { + std::vector > getSessionStates() const { return sessions.values(); } - StlMap< Pointer, Pointer, ConsumerId::COMPARATOR >& getRecoveringPullConsumers() { + StlMap, Pointer, ConsumerId::COMPARATOR>& getRecoveringPullConsumers() { return recoveringPullConsumers; } - void setConnectionInterruptProcessingComplete( bool connectionInterruptProcessingComplete ) { + void setConnectionInterruptProcessingComplete(bool connectionInterruptProcessingComplete) { this->connectionInterruptProcessingComplete = connectionInterruptProcessingComplete; } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1339242&r1=1339241&r2=1339242&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp Wed May 16 16:02:06 2012 @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -43,26 +44,29 @@ namespace state { class RemoveTransactionAction : public Runnable { private: - const Pointer info; + Pointer info; ConnectionStateTracker* stateTracker; private: - RemoveTransactionAction( const RemoveTransactionAction& ); - RemoveTransactionAction& operator= ( const RemoveTransactionAction& ); + RemoveTransactionAction(const RemoveTransactionAction&); + RemoveTransactionAction& operator=(const RemoveTransactionAction&); public: - RemoveTransactionAction( ConnectionStateTracker* stateTracker, - const Pointer& info ) : - info( info ), stateTracker( stateTracker ) {} + RemoveTransactionAction(ConnectionStateTracker* stateTracker, Pointer info) : + info(info), stateTracker(stateTracker) { + } virtual ~RemoveTransactionAction() {} virtual void run() { Pointer connectionId = info->getConnectionId(); - Pointer cs = stateTracker->connectionStates.get( connectionId ); - cs->removeTransactionState( info->getTransactionId() ); + Pointer cs = stateTracker->connectionStates.get(connectionId); + Pointer txState = cs->removeTransactionState(info->getTransactionId()); + if (txState != NULL) { + txState->clear(); + } } }; @@ -88,12 +92,12 @@ ConnectionStateTracker::~ConnectionState } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::track( const Pointer& command ) { +Pointer ConnectionStateTracker::track(Pointer command) { try{ - Pointer result = command->visit( this ); - if( result == NULL ) { + Pointer result = command->visit(this); + if (result == NULL) { return Pointer(); } else { return result.dynamicCast(); @@ -105,9 +109,9 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -void ConnectionStateTracker::trackBack( const Pointer& command ) { +void ConnectionStateTracker::trackBack(Pointer command) { - try{ + try { if (command != NULL) { if (trackMessages && command->isMessage()) { Pointer message = command.dynamicCast(); @@ -129,43 +133,43 @@ void ConnectionStateTracker::trackBack( } //////////////////////////////////////////////////////////////////////////////// -void ConnectionStateTracker::restore( const Pointer& transport ) { +void ConnectionStateTracker::restore(Pointer transport) { try{ std::vector< Pointer > connectionStates = this->connectionStates.values(); std::vector< Pointer >::const_iterator iter = connectionStates.begin(); - for( ; iter != connectionStates.end(); ++iter ) { + for (; iter != connectionStates.end(); ++iter) { Pointer state = *iter; Pointer info = state->getInfo(); - info->setFailoverReconnect( true ); - transport->oneway( info ); + info->setFailoverReconnect(true); + transport->oneway(info); - doRestoreTempDestinations( transport, state ); + doRestoreTempDestinations(transport, state); - if( restoreSessions ) { - doRestoreSessions( transport, state ); + if (restoreSessions) { + doRestoreSessions(transport, state); } - if( restoreTransaction ) { - doRestoreTransactions( transport, state ); + if (restoreTransaction) { + doRestoreTransactions(transport, state); } } // Now we flush messages - std::vector< Pointer > messages = messageCache.values(); - std::vector< Pointer >::const_iterator messageIter = messages.begin(); + std::vector > messages = messageCache.values(); + std::vector >::const_iterator messageIter = messages.begin(); - for( ; messageIter != messages.end(); ++messageIter ) { - transport->oneway( *messageIter ); + for (; messageIter != messages.end(); ++messageIter) { + transport->oneway(*messageIter); } - std::vector< Pointer > messagePulls = messagePullCache.values(); - std::vector< Pointer >::const_iterator messagePullIter = messagePulls.begin(); + std::vector > messagePulls = messagePullCache.values(); + std::vector >::const_iterator messagePullIter = messagePulls.begin(); - for(; messagePullIter != messagePulls.end(); ++messagePullIter) { + for (; messagePullIter != messagePulls.end(); ++messagePullIter) { transport->oneway(*messagePullIter); } } @@ -175,61 +179,61 @@ void ConnectionStateTracker::restore( co } //////////////////////////////////////////////////////////////////////////////// -void ConnectionStateTracker::doRestoreTransactions( const Pointer& transport, - const Pointer& connectionState ) { +void ConnectionStateTracker::doRestoreTransactions(Pointer transport, + Pointer connectionState) { - try{ + try { - std::vector< Pointer > toIgnore; + std::vector > toRollback; // Restore the session's transaction state - std::vector< Pointer > transactionStates = - connectionState->getTransactionStates(); + std::vector > transactionStates = connectionState->getTransactionStates(); + std::vector >::const_iterator iter = transactionStates.begin(); - std::vector< Pointer >::const_iterator iter = transactionStates.begin(); - - for( ; iter != transactionStates.end(); ++iter ) { - - // ignore any empty (ack) transaction - if( (*iter)->getCommands().size() == 2 ) { - Pointer lastCommand = (*iter)->getCommands().get(1); - if( lastCommand->isTransactionInfo() ) { - Pointer transactionInfo = lastCommand.dynamicCast(); - - if( transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE ) { - toIgnore.push_back(lastCommand); - continue; - } + // For any completed transactions we don't know if the commit actually made it to the broker + // or was lost along the way, so they need to be rolled back. + for (; iter != transactionStates.end(); ++iter) { + Pointer lastCommand = (*iter)->getCommands().getLast(); + if (lastCommand->isTransactionInfo()) { + Pointer transactionInfo = lastCommand.dynamicCast(); + if (transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE) { + toRollback.push_back(transactionInfo); + continue; } } // replay short lived producers that may have been involved in the transaction - std::vector< Pointer > producerStates = (*iter)->getProducerStates(); - std::vector< Pointer >::const_iterator state = producerStates.begin(); + std::vector > producerStates = (*iter)->getProducerStates(); + std::vector >::const_iterator state = producerStates.begin(); - for( ; state != producerStates.end(); ++state ) { - transport->oneway( (*state)->getInfo() ); + for (; state != producerStates.end(); ++state) { + transport->oneway((*state)->getInfo()); } - std::auto_ptr< Iterator< Pointer > > commands( - (*iter)->getCommands().iterator() ); + std::auto_ptr > > commands((*iter)->getCommands().iterator()); - while( commands->hasNext() ) { - transport->oneway( commands->next() ); + while (commands->hasNext()) { + transport->oneway(commands->next()); } state = producerStates.begin(); - for( ; state != producerStates.end(); ++state ) { - transport->oneway( (*state)->getInfo()->createRemoveCommand() ); + for (; state != producerStates.end(); ++state) { + transport->oneway((*state)->getInfo()->createRemoveCommand()); } } - std::vector< Pointer >::const_iterator command = toIgnore.begin(); - for( ; command != toIgnore.end(); ++command ) { - // respond to the outstanding commit - Pointer response( new Response() ); - response->setCorrelationId( (*command)->getCommandId() ); - transport->getTransportListener()->onCommand( response ); + // Trigger failure of commit for all outstanding completed but in doubt transactions. + std::vector >::const_iterator command = toRollback.begin(); + for (; command != toRollback.end(); ++command) { + Pointer response(new ExceptionResponse()); + Pointer exception(new BrokerError()); + exception->setExceptionClass("TransactionRolledBackException"); + exception->setMessage( + std::string("Transaction completion in doubt due to failover. Forcing rollback of ") + + (*command)->getTransactionId()->toString()); + response->setException(exception); + response->setCorrelationId((*command)->getCommandId()); + transport->getTransportListener()->onCommand(response); } } AMQ_CATCH_RETHROW( IOException ) @@ -238,26 +242,25 @@ void ConnectionStateTracker::doRestoreTr } //////////////////////////////////////////////////////////////////////////////// -void ConnectionStateTracker::doRestoreSessions( const Pointer& transport, - const Pointer& connectionState ) { +void ConnectionStateTracker::doRestoreSessions(Pointer transport, + Pointer connectionState) { - try{ - - std::vector< Pointer > sessionStates = connectionState->getSessionStates(); + try { - std::vector< Pointer >::const_iterator iter = sessionStates.begin(); + std::vector > sessionStates = connectionState->getSessionStates(); + std::vector >::const_iterator iter = sessionStates.begin(); // Restore the Session State - for( ; iter != sessionStates.end(); ++iter ) { + for (; iter != sessionStates.end(); ++iter) { Pointer state = *iter; - transport->oneway( state->getInfo() ); + transport->oneway(state->getInfo()); - if( restoreProducers ) { - doRestoreProducers( transport, state ); + if (restoreProducers) { + doRestoreProducers(transport, state); } - if( restoreConsumers ) { - doRestoreConsumers( transport, state ); + if (restoreConsumers) { + doRestoreConsumers(transport, state); } } } @@ -267,33 +270,34 @@ void ConnectionStateTracker::doRestoreSe } //////////////////////////////////////////////////////////////////////////////// -void ConnectionStateTracker::doRestoreConsumers( const Pointer& transport, - const Pointer& sessionState ) { +void ConnectionStateTracker::doRestoreConsumers(Pointer transport, Pointer sessionState ) { - try{ + try { // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete Pointer connectionState = - connectionStates.get( sessionState->getInfo()->getSessionId()->getParentId() ); + connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId()); bool connectionInterruptionProcessingComplete = connectionState->isConnectionInterruptProcessingComplete(); - std::vector< Pointer > consumerStates = sessionState->getConsumerStates(); - std::vector< Pointer >::const_iterator state = consumerStates.begin(); + std::vector > consumerStates = sessionState->getConsumerStates(); + std::vector >::const_iterator state = consumerStates.begin(); - for( ; state != consumerStates.end(); ++state ) { + for (; state != consumerStates.end(); ++state) { Pointer infoToSend = (*state)->getInfo(); Pointer wireFormat = transport->getWireFormat(); - if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0 && wireFormat->getVersion() > 5) { + if (!connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0 && + wireFormat->getVersion() > 5) { - infoToSend.reset( (*state)->getInfo()->cloneDataStructure() ); - connectionState->getRecoveringPullConsumers().put( infoToSend->getConsumerId(), (*state)->getInfo() ); + infoToSend.reset((*state)->getInfo()->cloneDataStructure()); + connectionState->getRecoveringPullConsumers().put( + infoToSend->getConsumerId(), (*state)->getInfo()); infoToSend->setPrefetchSize(0); } - transport->oneway( infoToSend ); + transport->oneway(infoToSend); } } AMQ_CATCH_RETHROW( IOException ) @@ -302,19 +306,17 @@ void ConnectionStateTracker::doRestoreCo } //////////////////////////////////////////////////////////////////////////////// -void ConnectionStateTracker::doRestoreProducers( const Pointer& transport, - const Pointer& sessionState ) { +void ConnectionStateTracker::doRestoreProducers(Pointer transport, Pointer sessionState ) { - try{ + try { // Restore the session's producers - std::vector< Pointer > producerStates = sessionState->getProducerStates(); + std::vector > producerStates = sessionState->getProducerStates(); + std::vector >::const_iterator iter = producerStates.begin(); - std::vector< Pointer >::const_iterator iter = producerStates.begin(); - - for( ; iter != producerStates.end(); ++iter ) { + for (; iter != producerStates.end(); ++iter) { Pointer state = *iter; - transport->oneway( state->getInfo() ); + transport->oneway(state->getInfo()); } } AMQ_CATCH_RETHROW( IOException ) @@ -323,16 +325,14 @@ void ConnectionStateTracker::doRestorePr } //////////////////////////////////////////////////////////////////////////////// -void ConnectionStateTracker::doRestoreTempDestinations( const Pointer& transport, - const Pointer& connectionState ) { - - try{ +void ConnectionStateTracker::doRestoreTempDestinations(Pointer transport, + Pointer connectionState ) { + try { + std::auto_ptr > > iter( + connectionState->getTempDesinations().iterator()); - std::auto_ptr< Iterator< Pointer > > iter( - connectionState->getTempDesinations().iterator() ); - - while( iter->hasNext() ) { - transport->oneway( iter->next() ); + while (iter->hasNext()) { + transport->oneway(iter->next()); } } AMQ_CATCH_RETHROW( IOException ) @@ -341,13 +341,13 @@ void ConnectionStateTracker::doRestoreTe } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processDestinationInfo( DestinationInfo* info ) { +Pointer ConnectionStateTracker::processDestinationInfo(DestinationInfo* info) { - try{ - if( info != NULL ) { - Pointer cs = connectionStates.get( info->getConnectionId() ); - if( cs != NULL && info->getDestination()->isTemporary() ) { - cs->addTempDestination( Pointer( info->cloneDataStructure() ) ); + try { + if (info != NULL) { + Pointer cs = connectionStates.get(info->getConnectionId()); + if (cs != NULL && info->getDestination()->isTemporary()) { + cs->addTempDestination(Pointer(info->cloneDataStructure())); } } return TRACKED_RESPONSE_MARKER; @@ -358,13 +358,13 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processRemoveDestination( DestinationInfo* info ) { +Pointer ConnectionStateTracker::processRemoveDestination(DestinationInfo* info) { - try{ - if( info != NULL ) { - Pointer cs = connectionStates.get( info->getConnectionId() ); - if( cs != NULL && info->getDestination()->isTemporary() ) { - cs->removeTempDestination( info->getDestination() ); + try { + if (info != NULL) { + Pointer cs = connectionStates.get(info->getConnectionId()); + if (cs != NULL && info->getDestination()->isTemporary()) { + cs->removeTempDestination(info->getDestination()); } } return TRACKED_RESPONSE_MARKER; @@ -375,21 +375,19 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processProducerInfo( ProducerInfo* info ) { +Pointer ConnectionStateTracker::processProducerInfo(ProducerInfo* info) { - try{ - - if( info != NULL && info->getProducerId() != NULL ) { + try { + if (info != NULL && info->getProducerId() != NULL) { Pointer sessionId = info->getProducerId()->getParentId(); - if( sessionId != NULL ) { + if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - Pointer ss = cs->getSessionState( sessionId ); - if( ss != NULL ) { - ss->addProducer( - Pointer( info->cloneDataStructure() ) ); + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { + Pointer ss = cs->getSessionState(sessionId); + if (ss != NULL) { + ss->addProducer(Pointer(info->cloneDataStructure())); } } } @@ -403,20 +401,19 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processRemoveProducer( ProducerId* id ) { - - try{ +Pointer ConnectionStateTracker::processRemoveProducer(ProducerId* id) { - if( id != NULL ) { + try { + if (id != NULL) { Pointer sessionId = id->getParentId(); - if( sessionId != NULL ) { + if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - Pointer ss = cs->getSessionState( sessionId ); - if( ss != NULL ) { - ss->removeProducer( Pointer( id->cloneDataStructure() ) ); + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { + Pointer ss = cs->getSessionState(sessionId); + if (ss != NULL) { + ss->removeProducer(Pointer(id->cloneDataStructure())); } } } @@ -430,21 +427,20 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processConsumerInfo( ConsumerInfo* info ) { +Pointer ConnectionStateTracker::processConsumerInfo(ConsumerInfo* info) { - try{ + try { - if( info != NULL ) { + if (info != NULL) { Pointer sessionId = info->getConsumerId()->getParentId(); - if( sessionId != NULL ) { + if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - Pointer ss = cs->getSessionState( sessionId ); - if( ss != NULL ) { - ss->addConsumer( - Pointer( info->cloneDataStructure() ) ); + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { + Pointer ss = cs->getSessionState(sessionId); + if (ss != NULL) { + ss->addConsumer(Pointer(info->cloneDataStructure())); } } } @@ -458,19 +454,19 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processRemoveConsumer( ConsumerId* id ) { +Pointer ConnectionStateTracker::processRemoveConsumer(ConsumerId* id) { - try{ - if( id != NULL ) { + try { + if (id != NULL) { Pointer sessionId = id->getParentId(); - if( sessionId != NULL ) { + if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - Pointer ss = cs->getSessionState( sessionId ); - if( ss != NULL ) { - ss->removeConsumer( Pointer( id->cloneDataStructure() ) ); + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { + Pointer ss = cs->getSessionState(sessionId); + if (ss != NULL) { + ss->removeConsumer(Pointer(id->cloneDataStructure())); } } } @@ -484,16 +480,16 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processSessionInfo( SessionInfo* info ) { +Pointer ConnectionStateTracker::processSessionInfo(SessionInfo* info) { - try{ + try { - if( info != NULL ) { + if (info != NULL) { Pointer connectionId = info->getSessionId()->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - cs->addSession( Pointer( info->cloneDataStructure() ) ); + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { + cs->addSession(Pointer(info->cloneDataStructure())); } } } @@ -505,16 +501,16 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processRemoveSession( SessionId* id ) { +Pointer ConnectionStateTracker::processRemoveSession(SessionId* id) { - try{ + try { - if( id != NULL ) { + if (id != NULL) { Pointer connectionId = id->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - cs->removeSession( Pointer( id->cloneDataStructure() ) ); + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { + cs->removeSession(Pointer(id->cloneDataStructure())); } } } @@ -526,14 +522,13 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processConnectionInfo( ConnectionInfo* info ) { +Pointer ConnectionStateTracker::processConnectionInfo(ConnectionInfo* info) { - try{ - - if( info != NULL ) { - Pointer infoCopy( info->cloneDataStructure() ); - connectionStates.put( info->getConnectionId(), - Pointer( new ConnectionState( infoCopy ) ) ); + try { + if (info != NULL) { + Pointer infoCopy(info->cloneDataStructure()); + connectionStates.put( + info->getConnectionId(), Pointer(new ConnectionState(infoCopy))); } return TRACKED_RESPONSE_MARKER; } @@ -543,12 +538,11 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processRemoveConnection( ConnectionId* id ) { +Pointer ConnectionStateTracker::processRemoveConnection(ConnectionId* id) { - try{ - - if( id != NULL ) { - connectionStates.remove( Pointer( id->cloneDataStructure() ) ); + try { + if (id != NULL) { + connectionStates.remove(Pointer(id->cloneDataStructure())); } return TRACKED_RESPONSE_MARKER; @@ -559,27 +553,26 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processMessage( Message* message ) { +Pointer ConnectionStateTracker::processMessage(Message* message) { - try{ + try { - if( message != NULL ) { - if( trackTransactions && message->getTransactionId() != NULL ) { + if (message != NULL) { + if (trackTransactions && message->getTransactionId() != NULL) { Pointer producerId = message->getProducerId(); Pointer connectionId = producerId->getParentId()->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { Pointer transactionState = - cs->getTransactionState( message->getTransactionId() ); - if( transactionState != NULL ) { - transactionState->addCommand( - Pointer( message->cloneDataStructure() ) ); + cs->getTransactionState(message->getTransactionId()); + if (transactionState != NULL) { + transactionState->addCommand(Pointer(message->cloneDataStructure())); - if( trackTransactionProducers ) { + if (trackTransactionProducers) { // Track the producer in case it is closed before a commit - Pointer sessionState = cs->getSessionState( producerId->getParentId() ); + Pointer sessionState = cs->getSessionState(producerId->getParentId()); Pointer producerState = sessionState->getProducerState(producerId); producerState->setTransactionState(transactionState); } @@ -587,39 +580,11 @@ Pointer ConnectionStateTracker: } } return TRACKED_RESPONSE_MARKER; - }else if( trackMessages ) { - messageCache.put( message->getMessageId(), - Pointer( message->cloneDataStructure() ) ); + } else if (trackMessages) { + messageCache.put(message->getMessageId(), Pointer (message->cloneDataStructure())); } } - return Pointer(); - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) -} - -//////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processMessageAck( MessageAck* ack ) { - try{ - - if( trackTransactions && ack != NULL && ack->getTransactionId() != NULL) { - Pointer connectionId;// = - ack->getConsumerId()->getParentId()->getParentId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - Pointer transactionState = - cs->getTransactionState( ack->getTransactionId() ); - if( transactionState != NULL ) { - transactionState->addCommand( - Pointer( ack->cloneDataStructure() ) ); - } - } - } - return TRACKED_RESPONSE_MARKER; - } return Pointer(); } AMQ_CATCH_RETHROW( ActiveMQException ) @@ -628,27 +593,26 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processBeginTransaction( TransactionInfo* info ) { +Pointer ConnectionStateTracker::processBeginTransaction(TransactionInfo* info) { - try{ + try { - if( trackTransactions && info != NULL ) { + if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { - cs->addTransactionState( info->getTransactionId() ); + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { + cs->addTransactionState(info->getTransactionId()); Pointer transactionState = - cs->getTransactionState( info->getTransactionId() ); - transactionState->addCommand( - Pointer( info->cloneDataStructure() ) ); + cs->getTransactionState(info->getTransactionId()); + transactionState->addCommand(Pointer(info->cloneDataStructure())); } } return TRACKED_RESPONSE_MARKER; } - return Pointer(); + return Pointer (); } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) @@ -656,20 +620,19 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processPrepareTransaction( TransactionInfo* info ) { +Pointer ConnectionStateTracker::processPrepareTransaction(TransactionInfo* info) { - try{ + try { - if( trackTransactions && info != NULL ) { + if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { Pointer transactionState = - cs->getTransactionState( info->getTransactionId() ); - if( transactionState != NULL ) { - transactionState->addCommand( - Pointer( info->cloneDataStructure() ) ); + cs->getTransactionState(info->getTransactionId()); + if (transactionState != NULL) { + transactionState->addCommand(Pointer(info->cloneDataStructure())); } } } @@ -685,29 +648,28 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processCommitTransactionOnePhase( TransactionInfo* info ) { +Pointer ConnectionStateTracker::processCommitTransactionOnePhase(TransactionInfo* info) { - try{ + try { - if( trackTransactions && info != NULL ) { + if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { Pointer transactionState = - cs->getTransactionState( info->getTransactionId() ); - if( transactionState != NULL ) { - Pointer infoCopy = - Pointer( info->cloneDataStructure() ); - transactionState->addCommand( infoCopy ); - return Pointer( new Tracked( - Pointer( new RemoveTransactionAction( this, infoCopy ) ) ) ); + cs->getTransactionState(info->getTransactionId()); + if (transactionState != NULL) { + Pointer infoCopy(info->cloneDataStructure()); + transactionState->addCommand(infoCopy); + return Pointer( + new Tracked(Pointer(new RemoveTransactionAction(this, infoCopy)))); } } } } - return Pointer(); + return Pointer (); } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) @@ -715,23 +677,22 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processCommitTransactionTwoPhase( TransactionInfo* info ) { +Pointer ConnectionStateTracker::processCommitTransactionTwoPhase(TransactionInfo* info) { - try{ + try { - if( trackTransactions && info != NULL ) { + if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { Pointer transactionState = - cs->getTransactionState( info->getTransactionId() ); - if( transactionState != NULL ) { - Pointer infoCopy = - Pointer( info->cloneDataStructure() ); - transactionState->addCommand( infoCopy ); - return Pointer( new Tracked( - Pointer( new RemoveTransactionAction( this, infoCopy ) ) ) ); + cs->getTransactionState(info->getTransactionId()); + if (transactionState != NULL) { + Pointer infoCopy(info->cloneDataStructure()); + transactionState->addCommand(infoCopy); + return Pointer( + new Tracked(Pointer(new RemoveTransactionAction(this, infoCopy)))); } } } @@ -745,23 +706,22 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processRollbackTransaction( TransactionInfo* info ) { +Pointer ConnectionStateTracker::processRollbackTransaction(TransactionInfo* info) { - try{ + try { - if( trackTransactions && info != NULL ) { + if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { Pointer transactionState = - cs->getTransactionState( info->getTransactionId() ); - if( transactionState != NULL ) { - Pointer infoCopy = - Pointer( info->cloneDataStructure() ); - transactionState->addCommand( infoCopy ); - return Pointer( new Tracked( - Pointer( new RemoveTransactionAction( this, infoCopy ) ) ) ); + cs->getTransactionState(info->getTransactionId()); + if (transactionState != NULL) { + Pointer infoCopy(info->cloneDataStructure()); + transactionState->addCommand(infoCopy); + return Pointer( + new Tracked(Pointer(new RemoveTransactionAction(this, infoCopy)))); } } } @@ -775,20 +735,19 @@ Pointer ConnectionStateTracker: } //////////////////////////////////////////////////////////////////////////////// -Pointer ConnectionStateTracker::processEndTransaction( TransactionInfo* info ) { +Pointer ConnectionStateTracker::processEndTransaction(TransactionInfo* info) { - try{ + try { - if( trackTransactions && info != NULL ) { + if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); - if( connectionId != NULL ) { - Pointer cs = connectionStates.get( connectionId ); - if( cs != NULL ) { + if (connectionId != NULL) { + Pointer cs = connectionStates.get(connectionId); + if (cs != NULL) { Pointer transactionState = - cs->getTransactionState( info->getTransactionId() ); - if( transactionState != NULL ) { - transactionState->addCommand( - Pointer( info->cloneDataStructure() ) ); + cs->getTransactionState(info->getTransactionId()); + if (transactionState != NULL) { + transactionState->addCommand(Pointer (info->cloneDataStructure())); } } } @@ -806,14 +765,14 @@ Pointer ConnectionStateTracker: //////////////////////////////////////////////////////////////////////////////// Pointer ConnectionStateTracker::processMessagePull(MessagePull* pull) { - try{ + try { if (pull != NULL && pull->getDestination() != NULL && pull->getConsumerId() != NULL) { std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString(); messagePullCache.put(id, Pointer(pull->cloneDataStructure())); } - return Pointer(); + return Pointer (); } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) @@ -822,30 +781,30 @@ Pointer ConnectionStateTracker: //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::connectionInterruptProcessingComplete( - transport::Transport* transport, const Pointer& connectionId) { + transport::Transport* transport, Pointer connectionId) { - Pointer connectionState = connectionStates.get( connectionId ); + Pointer connectionState = connectionStates.get(connectionId); - if( connectionState != NULL ) { + if (connectionState != NULL) { - connectionState->setConnectionInterruptProcessingComplete( true ); + connectionState->setConnectionInterruptProcessingComplete(true); - StlMap< Pointer, Pointer, ConsumerId::COMPARATOR > stalledConsumers = + StlMap, Pointer, ConsumerId::COMPARATOR> stalledConsumers = connectionState->getRecoveringPullConsumers(); - std::vector< Pointer > keySet = stalledConsumers.keySet(); - std::vector< Pointer >::const_iterator key = keySet.begin(); + std::vector > keySet = stalledConsumers.keySet(); + std::vector >::const_iterator key = keySet.begin(); - for( ; key != keySet.end(); ++key ) { - Pointer control( new ConsumerControl() ); + for (; key != keySet.end(); ++key) { + Pointer control(new ConsumerControl()); - control->setConsumerId( *key ); - control->setPrefetch( stalledConsumers.get( *key )->getPrefetchSize() ); - control->setDestination( stalledConsumers.get( *key )->getDestination() ); + control->setConsumerId(*key); + control->setPrefetch(stalledConsumers.get(*key)->getPrefetchSize()); + control->setDestination(stalledConsumers.get(*key)->getDestination()); try { - transport->oneway( control ); - } catch( Exception& ex ) { + transport->oneway(control); + } catch (Exception& ex) { } } @@ -856,10 +815,10 @@ void ConnectionStateTracker::connectionI //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::transportInterrupted() { - std::vector< Pointer > connectionStatesVec = this->connectionStates.values(); - std::vector< Pointer >::const_iterator state = connectionStatesVec.begin(); + std::vector > connectionStatesVec = this->connectionStates.values(); + std::vector >::const_iterator state = connectionStatesVec.begin(); - for( ; state != connectionStatesVec.end(); ++state ) { - (*state)->setConnectionInterruptProcessingComplete( false ); + for (; state != connectionStatesVec.end(); ++state) { + (*state)->setConnectionInterruptProcessingComplete(false); } }