activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1339424 - in /activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src: examples/producers/ main/activemq/core/ main/activemq/state/
Date Wed, 16 May 2012 23:28:45 GMT
Author: tabish
Date: Wed May 16 23:28:45 2012
New Revision: 1339424

URL: http://svn.apache.org/viewvc?rev=1339424&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-403

Modified:
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/examples/producers/SimpleProducer.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/SessionState.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/TransactionState.cpp

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/examples/producers/SimpleProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/examples/producers/SimpleProducer.cpp?rev=1339424&r1=1339423&r2=1339424&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/examples/producers/SimpleProducer.cpp
(original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/examples/producers/SimpleProducer.cpp
Wed May 16 23:28:45 2012
@@ -126,53 +126,37 @@ public:
             // Create a messages
             string text = (string)"Hello world! from thread " + threadIdStr;
 
-            for( unsigned int ix=0; ix<numMessages; ++ix ){
-                TextMessage* message = session->createTextMessage( text );
-
-                message->setIntProperty( "Integer", ix );
-
-                // Tell the producer to send the message
-                printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
-                producer->send( message );
-
-                delete message;
-            }
-
-        }catch ( CMSException& e ) {
-            e.printStackTrace();
-        }
+            for (unsigned int ix = 0; ix < numMessages; ++ix) {
+				std::auto_ptr<TextMessage> message(session->createTextMessage(text));
+				message->setIntProperty("Integer", ix);
+				printf("Sent message #%d from thread %s\n", ix + 1, threadIdStr.c_str());
+				producer->send(message.get());
+			}
+
+		} catch (CMSException& e) {
+			e.printStackTrace();
+		}
     }
 
 private:
 
     void cleanup(){
 
-        // Destroy resources.
-        try{
-            if( destination != NULL ) delete destination;
-        }catch ( CMSException& e ) { e.printStackTrace(); }
-        destination = NULL;
-
-        try{
-            if( producer != NULL ) delete producer;
-        }catch ( CMSException& e ) { e.printStackTrace(); }
-        producer = NULL;
-
-        // Close open resources.
-        try{
-            if( session != NULL ) session->close();
-            if( connection != NULL ) connection->close();
-        }catch ( CMSException& e ) { e.printStackTrace(); }
-
-        try{
-            if( session != NULL ) delete session;
-        }catch ( CMSException& e ) { e.printStackTrace(); }
-        session = NULL;
-
-        try{
-            if( connection != NULL ) delete connection;
-        }catch ( CMSException& e ) { e.printStackTrace(); }
-        connection = NULL;
+    	if (connection != NULL) {
+			try {
+				connection->close();
+			} catch (cms::CMSException& ex) {
+			}
+    	}
+
+    	delete destination;
+    	destination = NULL;
+    	delete producer;
+    	producer = NULL;
+    	delete session;
+    	session = NULL;
+    	delete connection;
+    	connection = NULL;
     }
 };
 

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1339424&r1=1339423&r2=1339424&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
(original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
Wed May 16 23:28:45 2012
@@ -192,28 +192,30 @@ void ActiveMQTransactionContext::commit(
                 "Cannot Commit a local transaction while an XA Transaction is in progress.");
         }
 
-        if( this->context->transactionId.get() == NULL ) {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQTransactionContext::commit - "
-                "Commit called before transaction was started.");
+        try {
+            this->beforeEnd();
+        } catch (std::exception& ex) {
+            rollback();
+            throw;
         }
 
-        this->beforeEnd();
+        if (isInTransaction()) {
+            Pointer<TransactionInfo> info(new TransactionInfo());
+            info->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+            info->setTransactionId(this->context->transactionId);
+            info->setType(ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE);
 
-        // Create and Populate the Info Command.
-        Pointer<TransactionInfo> info( new TransactionInfo() );
-        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId()
);
-        info->setTransactionId( this->context->transactionId );
-        info->setType( ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
-
-        // Before we send the command NULL the id in case of an exception.
-        this->context->transactionId.reset( NULL );
+            // Before we send the command NULL the id in case of an exception.
+            this->context->transactionId.reset(NULL);
 
-        // Commit the current Transaction
-        this->connection->syncRequest( info );
-
-        this->afterCommit();
+            try {
+                this->connection->syncRequest(info);
+                this->afterCommit();
+            } catch(cms::CMSException& ex) {
+                this->afterRollback();
+                throw;
+            }
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -230,28 +232,25 @@ void ActiveMQTransactionContext::rollbac
                 "Cannot Rollback a local transaction while an XA Transaction is in progress.");
         }
 
-        if( this->context->transactionId == NULL ) {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQTransactionContext::rollback - "
-                "Rollback called before transaction was started.");
+        try {
+            this->beforeEnd();
+        } catch (ActiveMQTransactionContext& ex) {
+            // Ignore, can occur on failover if the last command was commit.
         }
 
-        this->beforeEnd();
-
-        // Create and Populate the Info Command.
-        Pointer<TransactionInfo> info( new TransactionInfo() );
-        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId()
);
-        info->setTransactionId( this->context->transactionId );
-        info->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
+        if (isInTransaction()) {
 
-        // Before we send the command NULL the id in case of an exception.
-        this->context->transactionId.reset( NULL );
+            Pointer<TransactionInfo> info(new TransactionInfo());
+            info->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+            info->setTransactionId(this->context->transactionId);
+            info->setType(ActiveMQConstants::TRANSACTION_STATE_ROLLBACK);
 
-        // Roll back the current Transaction
-        this->connection->syncRequest( info );
+            // Before we send the command NULL the id in case of an exception.
+            this->context->transactionId.reset(NULL);
 
-        this->afterRollback();
+            this->connection->syncRequest(info);
+            this->afterRollback();
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1339424&r1=1339423&r2=1339424&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
(original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
Wed May 16 23:28:45 2012
@@ -21,6 +21,7 @@
 #include <decaf/util/NoSuchElementException.h>
 
 #include <activemq/commands/ConsumerControl.h>
+#include <activemq/commands/ExceptionResponse.h>
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/transport/TransportListener.h>
@@ -62,7 +63,7 @@ namespace state {
         virtual void run() {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
             Pointer<ConnectionState> cs = stateTracker->connectionStates.get( connectionId
);
-            cs->removeTransactionState( info->getTransactionId() );
+            cs->removeTransactionState(info->getTransactionId());
         }
     };
 
@@ -166,7 +167,7 @@ void ConnectionStateTracker::doRestoreTr
 
     try{
 
-        std::vector< Pointer<Command> > toIgnore;
+        std::vector< Pointer<TransactionInfo> > toRollback;
 
         // Restore the session's transaction state
         std::vector< Pointer<TransactionState> > transactionStates =
@@ -174,18 +175,15 @@ void ConnectionStateTracker::doRestoreTr
 
         std::vector< Pointer<TransactionState> >::const_iterator iter = transactionStates.begin();
 
-        for( ; iter != transactionStates.end(); ++iter ) {
-
-            // ignore any empty (ack) transaction
-            if( (*iter)->getCommands().size() == 2 ) {
-                Pointer<Command> lastCommand = (*iter)->getCommands().get(1);
-                if( lastCommand->isTransactionInfo() ) {
-                    Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
-
-                    if( transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE
) {
-                        toIgnore.push_back(lastCommand);
-                        continue;
-                    }
+        // For any completed transactions we don't know if the commit actually made it to
the broker
+        // or was lost along the way, so they need to be rolled back.
+        for (; iter != transactionStates.end(); ++iter) {
+            Pointer<Command> lastCommand = (*iter)->getCommands().getLast();
+            if (lastCommand->isTransactionInfo()) {
+                Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
+                if (transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE)
{
+                    toRollback.push_back(transactionInfo);
+                    continue;
                 }
             }
 
@@ -197,8 +195,7 @@ void ConnectionStateTracker::doRestoreTr
                 transport->oneway( (*state)->getInfo() );
             }
 
-            std::auto_ptr< Iterator< Pointer<Command> > > commands(
-                (*iter)->getCommands().iterator() );
+            std::auto_ptr<Iterator<Pointer<Command> > > commands((*iter)->getCommands().iterator());
 
             while( commands->hasNext() ) {
                 transport->oneway( commands->next() );
@@ -210,12 +207,18 @@ void ConnectionStateTracker::doRestoreTr
             }
         }
 
-        std::vector< Pointer<Command> >::const_iterator command = toIgnore.begin();
-        for( ; command != toIgnore.end(); ++command ) {
-            // respond to the outstanding commit
-            Pointer<Response> response( new Response() );
-            response->setCorrelationId( (*command)->getCommandId() );
-            transport->getTransportListener()->onCommand( response );
+        // Trigger failure of commit for all outstanding completed but in doubt transactions.
+        std::vector<Pointer<TransactionInfo> >::const_iterator command = toRollback.begin();
+        for (; command != toRollback.end(); ++command) {
+            Pointer<ExceptionResponse> response(new ExceptionResponse());
+            Pointer<BrokerError> exception(new BrokerError());
+            exception->setExceptionClass("TransactionRolledBackException");
+            exception->setMessage(
+                std::string("Transaction completion in doubt due to failover. Forcing rollback
of ") +
+                (*command)->getTransactionId()->toString());
+            response->setException(exception);
+            response->setCorrelationId((*command)->getCommandId());
+            transport->getTransportListener()->onCommand(response);
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -230,7 +233,6 @@ void ConnectionStateTracker::doRestoreSe
     try{
 
         std::vector< Pointer<SessionState> > sessionStates = connectionState->getSessionStates();
-
         std::vector< Pointer<SessionState> >::const_iterator iter = sessionStates.begin();
 
         // Restore the Session State
@@ -273,7 +275,6 @@ void ConnectionStateTracker::doRestoreCo
             Pointer<wireformat::WireFormat> wireFormat = transport->getWireFormat();
 
             if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize()
> 0 && wireFormat->getVersion() > 5) {
-
                 infoToSend.reset( (*state)->getInfo()->cloneDataStructure() );
                 connectionState->getRecoveringPullConsumers().put( infoToSend->getConsumerId(),
(*state)->getInfo() );
                 infoToSend->setPrefetchSize(0);
@@ -291,11 +292,9 @@ void ConnectionStateTracker::doRestoreCo
 void ConnectionStateTracker::doRestoreProducers( const Pointer<transport::Transport>&
transport,
                                                  const Pointer<SessionState>& sessionState
) {
 
-    try{
-
+    try {
         // Restore the session's producers
         std::vector< Pointer<ProducerState> > producerStates = sessionState->getProducerStates();
-
         std::vector< Pointer<ProducerState> >::const_iterator iter = producerStates.begin();
 
         for( ; iter != producerStates.end(); ++iter ) {
@@ -586,7 +585,7 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Command> ConnectionStateTracker::processMessageAck( MessageAck* ack ) {
+Pointer<Command> ConnectionStateTracker::processMessageAck( MessageAck* ack AMQCPP_UNUSED)
{
 	// Do nothing here, acks would be stale on connection restore.  Allow the rollback
 	// to deal with these as they are rolled back and redelivered.
     return Pointer<Response>();
@@ -662,8 +661,7 @@ Pointer<Command> ConnectionStateTracker:
                     Pointer<TransactionState> transactionState =
                         cs->getTransactionState( info->getTransactionId() );
                     if( transactionState != NULL ) {
-                        Pointer<TransactionInfo> infoCopy =
-                            Pointer<TransactionInfo>( info->cloneDataStructure()
);
+                        Pointer<TransactionInfo> infoCopy( info->cloneDataStructure()
);
                         transactionState->addCommand( infoCopy );
                         return Pointer<Tracked>( new Tracked(
                             Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy
) ) ) );
@@ -692,8 +690,7 @@ Pointer<Command> ConnectionStateTracker:
                     Pointer<TransactionState> transactionState =
                         cs->getTransactionState( info->getTransactionId() );
                     if( transactionState != NULL ) {
-                        Pointer<TransactionInfo> infoCopy =
-                            Pointer<TransactionInfo>( info->cloneDataStructure()
);
+                        Pointer<TransactionInfo> infoCopy( info->cloneDataStructure()
);
                         transactionState->addCommand( infoCopy );
                         return Pointer<Tracked>( new Tracked(
                             Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy
) ) ) );
@@ -722,8 +719,7 @@ Pointer<Command> ConnectionStateTracker:
                     Pointer<TransactionState> transactionState =
                         cs->getTransactionState( info->getTransactionId() );
                     if( transactionState != NULL ) {
-                        Pointer<TransactionInfo> infoCopy =
-                            Pointer<TransactionInfo>( info->cloneDataStructure()
);
+                        Pointer<TransactionInfo> infoCopy( info->cloneDataStructure()
);
                         transactionState->addCommand( infoCopy );
                         return Pointer<Tracked>( new Tracked(
                             Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy
) ) ) );

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/SessionState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/SessionState.cpp?rev=1339424&r1=1339423&r2=1339424&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/SessionState.cpp
(original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/SessionState.cpp
Wed May 16 23:28:45 2012
@@ -70,6 +70,7 @@ Pointer<ProducerState> SessionState::rem
         if( producerState->getTransactionState() != NULL ) {
             // allow the transaction to recreate dependent producer on recovery
             producerState->getTransactionState()->addProducerState( producerState );
+            producerState->getTransactionState().reset(NULL);
         }
     }
 

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/TransactionState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/TransactionState.cpp?rev=1339424&r1=1339423&r2=1339424&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/TransactionState.cpp
(original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/TransactionState.cpp
Wed May 16 23:28:45 2012
@@ -36,6 +36,7 @@ TransactionState::TransactionState( cons
 ////////////////////////////////////////////////////////////////////////////////
 TransactionState::~TransactionState() {
     this->commands.clear();
+    this->producers.clear();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -68,6 +69,7 @@ void TransactionState::checkShutdown() c
 void TransactionState::addProducerState( const Pointer<ProducerState>& producerState
) {
 
     if( producerState != NULL ) {
+        producerState->getTransactionState().reset(NULL);
         producers.put( producerState->getInfo()->getProducerId(), producerState );
     }
 }



Mime
View raw message