activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r902057 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/state/ test/java/org/apache/activemq/transport/failover/
Date Fri, 22 Jan 2010 11:01:53 GMT
Author: gtully
Date: Fri Jan 22 11:01:53 2010
New Revision: 902057

URL: http://svn.apache.org/viewvc?rev=902057&view=rev
Log:
svn merge -c 902056 https://svn.apache.org/repos/asf/activemq/trunk - resolve intermittent
failure of org.apache.activemq.bugs.AMQ2149Test as a result of https://issues.apache.org/activemq/browse/AMQ-2573.
An new command to indicate when recovery is complete on a sub is needed to gate dispatch on
a recovered connection, can do this for 5.4. An outstanding ack transaction is now ignored
but an outstand send transaction can still cause a potential hang till we introduce the new
recovery complete command

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
      - copied unchanged from r902056, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=902057&r1=902056&r2=902057&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Jan 22 11:01:53 2010
@@ -506,7 +506,7 @@
     }
 
     public void stop() throws Exception {
-        if (!started.compareAndSet(true, false)) {
+        if (!started.get()) {
             return;
         }
         

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=902057&r1=902056&r2=902057&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Fri Jan 22 11:01:53 2010
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.command.Command;
@@ -139,11 +140,28 @@
     }
 
     private void restoreTransactions(Transport transport, ConnectionState connectionState)
throws IOException {
+        Vector<Command> toIgnore = new Vector<Command>();
         for (TransactionState transactionState : connectionState.getTransactionStates())
{
             if (LOG.isDebugEnabled()) {
                 LOG.debug("tx: " + transactionState.getId());
             }
             
+            // ignore any empty (ack) transaction
+            if (transactionState.getCommands().size() == 2) {
+                Command lastCommand = transactionState.getCommands().get(1);
+                if (lastCommand instanceof TransactionInfo) {
+                    TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
+                    if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("not replaying empty (ack) tx: " + transactionState.getId());
+                        }
+                        toIgnore.add(lastCommand);
+                        continue;
+                    }
+                }
+            }
+            
+            // replay short lived producers that may have been involved in the transaction
             for (ProducerState producerState : transactionState.getProducerStates().values())
{
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx replay producer :" + producerState.getInfo());
@@ -165,6 +183,13 @@
                 transport.oneway(producerState.getInfo().createRemoveCommand());
             }
         }
+        
+        for (Command command: toIgnore) {
+            // respond to the outstanding commit
+            Response response = new Response();
+            response.setCorrelationId(command.getCommandId());
+            transport.getTransportListener().onCommand(response);
+        }
     }
 
     /**
@@ -200,6 +225,9 @@
         // Restore the session's consumers
         for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();)
{
             ConsumerState consumerState = (ConsumerState)iter3.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("restore consumer: " + consumerState.getInfo().getConsumerId());
+            }
             transport.oneway(consumerState.getInfo());
         }
     }



Mime
View raw message