activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r905437 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/transport/failover/
Date Mon, 01 Feb 2010 23:01:45 GMT
Author: gtully
Date: Mon Feb  1 23:01:45 2010
New Revision: 905437

URL: http://svn.apache.org/viewvc?rev=905437&view=rev
Log:
merge -c 905432 https://svn.apache.org/repos/asf/activemq/trunk - commit may throw a TransactionRolledBackException
in the event that after a failover recovery, the same messages are not redispatched - the
transaction cannot be fully recreated so it must rollback

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=905437&r1=905436&r2=905437&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Feb  1 23:01:45 2010
@@ -46,7 +46,9 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -58,6 +60,7 @@
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import javax.jms.TransactionRolledBackException;
 
 /**
  * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@@ -109,6 +112,7 @@
     // not been acknowledged. It's kept in reverse order since we
     // Always walk list in reverse order.
     private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
+    private HashMap<MessageId, Boolean> previouslyDeliveredMessages;
     private int deliveredCounter;
     private int additionalWindowSize;
     private long redeliveryDelay;
@@ -146,7 +150,7 @@
      * @param name
      * @param selector
      * @param prefetch
-     * @param maximumPendingMessageCount TODO
+     * @param maximumPendingMessageCount
      * @param noLocal
      * @param browser
      * @param dispatchAsync
@@ -640,7 +644,7 @@
     }
     
     void clearMessagesInProgress() {
-        // deal with delivered messages async to avoid lock contention with in pogress acks
+        // deal with delivered messages async to avoid lock contention with in progress acks
         clearDispatchList = true;
         synchronized (unconsumedMessages.getMutex()) {            
             if (LOG.isDebugEnabled()) {
@@ -951,6 +955,7 @@
             	return; // no msgs
             
             if (session.getTransacted()) {
+                rollbackOnFailedRecoveryRedelivery();
                 session.doStartTransaction();
                 ack.setTransactionId(session.getTransactionContext().getTransactionId());
             }
@@ -967,6 +972,51 @@
         }
     }
     
+    /*
+     * called with deliveredMessages locked
+     */
+    private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
+        if (previouslyDeliveredMessages != null) {
+            // if any previously delivered messages was not re-delivered, transaction is
invalid and must rollback
+            // as messages have been dispatched else where.
+            int numberNotReplayed = 0;
+            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
+                if (!entry.getValue()) {
+                    numberNotReplayed++;
+                    // allow outstanding messages to get delivered again
+                    removeFromDeliveredMessages(entry.getKey());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("previously delivered message has not been replayed in
transaction, id: " + entry.getKey());
+                    }
+                }
+            }
+            clearPreviouslyDelivered();
+            
+            if (numberNotReplayed > 0) {
+                String message = "rolling back transaction post failover recovery. " + numberNotReplayed
+                    + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
+                LOG.warn(message);
+                throw new TransactionRolledBackException(message);
+            }
+        }
+        
+    }
+
+    /*
+     * called with deliveredMessages locked
+     */
+    private void removeFromDeliveredMessages(MessageId key) {
+        ListIterator<MessageDispatch> iterator = deliveredMessages.listIterator(deliveredMessages.size());
+        while (iterator.hasPrevious()) {
+            MessageDispatch candidate = iterator.previous();
+            if (key.equals(candidate.getMessage().getMessageId())) {
+                session.connection.rollbackDuplicate(this, candidate.getMessage());
+                iterator.remove();
+                break;
+            }
+        }
+    }
+
     void acknowledge(MessageDispatch md) throws JMSException {
         MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
         session.sendAck(ack);
@@ -978,6 +1028,7 @@
     public void commit() throws JMSException {
         synchronized (deliveredMessages) {
             deliveredMessages.clear();
+            clearPreviouslyDelivered();
         }
         redeliveryDelay = 0;
     }
@@ -998,6 +1049,7 @@
                 }
             }
             synchronized(deliveredMessages) {
+                clearPreviouslyDelivered();
                 if (deliveredMessages.isEmpty()) {
                     return;
                 }
@@ -1073,6 +1125,16 @@
         }
     }
 
+    /*
+     * called with deliveredMessages locked
+     */
+    private void clearPreviouslyDelivered() {
+        if (previouslyDeliveredMessages != null) {
+            previouslyDeliveredMessages.clear();
+            previouslyDeliveredMessages = null;
+        }
+    }
+
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener.get();
         try {
@@ -1106,11 +1168,23 @@
                             }
                         }
                     } else {
-                        // ignore duplicate
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate:
" + md.getMessage());
+                        if (!session.isTransacted()) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate:
" + md.getMessage());
+                            }
+                            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
1);
+                            session.sendAck(ack);
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(getConsumerId() + " tracking transacted redlivery
of duplicate: " + md.getMessage());
+                            }
+                            synchronized (deliveredMessages) {
+                                if (previouslyDeliveredMessages != null) {
+                                    previouslyDeliveredMessages.put(md.getMessage().getMessageId(),
true);
+                                }
+                            }
+                            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                         }
-                        acknowledge(md);
                     }
                 }
             }
@@ -1126,13 +1200,27 @@
     // async (on next call) clear delivered as they will be auto-acked as duplicates if they
arrive again
     private void clearDispatchList() {
         if (clearDispatchList) {
-            synchronized (deliveredMessages) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(getConsumerId() + " async clearing delivered list (" + deliveredMessages.size()
+ ") on transport interrupt");
-                }
+            synchronized (deliveredMessages) {  
                 if (clearDispatchList) {
-                    deliveredMessages.clear();
-                    pendingAck = null;
+                    if (!deliveredMessages.isEmpty()) {
+                        if (session.isTransacted()) {    
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(getConsumerId() + " tracking delivered list ("
+ deliveredMessages.size() + ") on transport interrupt");
+                            }
+                            if (previouslyDeliveredMessages == null) {
+                                previouslyDeliveredMessages = new HashMap<MessageId, Boolean>();
+                            }
+                            for (MessageDispatch delivered : deliveredMessages) {
+                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(),
false);
+                            }
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(getConsumerId() + " clearing delivered list ("
+ deliveredMessages.size() + ") on transport interrupt");
+                            }
+                            deliveredMessages.clear();
+                            pendingAck = null;
+                        }
+                    }
                     clearDispatchList = false;
                 }
             }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=905437&r1=905436&r2=905437&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Mon Feb  1 23:01:45 2010
@@ -516,7 +516,7 @@
      */
     public boolean getTransacted() throws JMSException {
         checkClosed();
-        return (acknowledgementMode == Session.SESSION_TRANSACTED) || (transactionContext.isInXATransaction());
+        return isTransacted();
     }
 
     /**
@@ -1784,7 +1784,7 @@
      * @return true - if the session uses transactions.
      */
     public boolean isTransacted() {
-        return this.acknowledgementMode == Session.SESSION_TRANSACTED;
+        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
     }
 
     /**

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=905437&r1=905436&r2=905437&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Mon Feb  1 23:01:45 2010
@@ -24,6 +24,7 @@
 
 import javax.jms.JMSException;
 import javax.jms.TransactionInProgressException;
+import javax.jms.TransactionRolledBackException;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -235,7 +236,11 @@
             throw new TransactionInProgressException("Cannot rollback() if an XA transaction
is already in progress ");
         }
         
-        beforeEnd();
+        try {
+            beforeEnd();
+        } catch (TransactionRolledBackException canOcurrOnFailover) {
+            LOG.warn("rollback processing error", canOcurrOnFailover);
+        }
         if (transactionId != null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Rollback: "  + transactionId
@@ -270,7 +275,12 @@
             throw new TransactionInProgressException("Cannot commit() if an XA transaction
is already in progress ");
         }
         
-        beforeEnd();
+        try {
+            beforeEnd();
+        } catch (JMSException e) {
+            rollback();
+            throw e;
+        }
 
         // Only send commit if the transaction was started.
         if (transactionId != null) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=905437&r1=905436&r2=905437&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
Mon Feb  1 23:01:45 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.failover;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.concurrent.CountDownLatch;
@@ -167,6 +168,62 @@
         connection.close();
     }
         
+    @Test
+    public void testRollbackFailoverConsumerTx() throws Exception {
+        broker = createBroker(true);
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");        
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
+
+        final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME);
+
+        final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
+        assertNull("no message yet", testConsumer.receiveNoWait());
+        
+        produceMessage(producerSession, destination, 1);
+        producerSession.close();
+
+        // consume then rollback after restart
+        Message msg = testConsumer.receive(5000);
+        assertNotNull(msg);
+        
+        // restart with out standing delivered message
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createBroker(false);
+        broker.start();
+        
+        consumerSession.rollback();
+        
+        // receive again
+        msg = testConsumer.receive(10000);
+        assertNotNull("got message again after rollback", msg);
+
+        consumerSession.commit();
+        
+        // close before sweep
+        consumerSession.close();
+        msg = receiveMessage(cf, destination);
+        assertNull("should be nothing left after commit", msg);
+        connection.close();
+    }
+
+    private Message receiveMessage(ActiveMQConnectionFactory cf,
+            Queue destination) throws Exception {
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
+        final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        final MessageConsumer consumer = consumerSession.createConsumer(destination);
+        Message msg = consumer.receive(5000);
+        consumerSession.commit();
+        connection.close();
+        return msg;
+    }
+
     private void produceMessage(final Session producerSession, Queue destination, long count)
         throws JMSException {
         MessageProducer producer = producerSession.createProducer(destination);

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=905437&r1=905436&r2=905437&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Mon Feb  1 23:01:45 2010
@@ -36,6 +36,7 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerPlugin;
@@ -468,7 +469,8 @@
                     
                     // should not get a second message as there are two messages and two
consumers
                     // but with failover and unordered connection restore it can get the
second
-                    // message which could create a problem for a pending ack
+                    // message which could create a problem for a pending ack and also invalidate
+                    // the transaction in which the first was consumed and acked
                     msg = consumer1.receive(5000);
                     LOG.info("consumer1 second attempt got message: " + msg);
                     if (msg != null) {
@@ -476,7 +478,17 @@
                     }
                     
                     LOG.info("committing consumer1 session: " + receivedMessages.size() +
" messsage(s)");
-                    consumerSession1.commit();
+                    try {
+                        consumerSession1.commit();
+                    } catch (JMSException expectedSometimes) {
+                        LOG.info("got rollback ex on commit", expectedSometimes);
+                        if (expectedSometimes instanceof TransactionRolledBackException &&
receivedMessages.size() == 2) {
+                            // ok, message one was not replayed so we expect the rollback
+                        } else {
+                            throw expectedSometimes;
+                        }
+                        
+                    }
                     commitDoneLatch.countDown();
                     LOG.info("done async commit");
                 } catch (Exception e) {
@@ -494,21 +506,23 @@
 
         assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
         
-        // getting 2 is indicative of a problem - proven with dangling message found after
restart
+        // getting 2 is indicative of orderiing issue. a problem if dangling message found
after restart
         LOG.info("received message count: " + receivedMessages.size());
         
         // new transaction
         Message msg = consumer1.receive(2000);
         LOG.info("post: from consumer1 received: " + msg);
-        assertNull("should be nothing left for consumer1", msg);
+        if (receivedMessages.size() == 1) {
+            assertNull("should be nothing left for consumer as recieve should have committed",
msg);
+        } else {
+            assertNotNull("should be available again after commit rollback ex", msg);
+        }
         consumerSession1.commit();
         
-        // consumer2 should get other message provided consumer1 did not get 2
+        // consumer2 should get other message
         msg = consumer2.receive(5000);
         LOG.info("post: from consumer2 received: " + msg);
-        if (receivedMessages.size() == 1) {
-            assertNotNull("got second message on consumer2", msg);
-        }
+        assertNotNull("got second message on consumer2", msg);
         consumerSession2.commit();
         
         for (Connection c: connections) {



Mime
View raw message