activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r908453 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/
Date Wed, 10 Feb 2010 11:34:35 GMT
Author: gtully
Date: Wed Feb 10 11:34:35 2010
New Revision: 908453

URL: http://svn.apache.org/viewvc?rev=908453&view=rev
Log:
svn merge -c 906450 https://svn.apache.org/repos/asf/activemq/trunk - further evolution of
resolution for https://issues.apache.org/activemq/browse/AMQ-2590 - indoubt transactions are
now rolledback, pending transactions can wait for jms.consumerFailoverRedeliveryWaitPeriod
for redeliveries before rolling back. If previously delivered messages are not replayed the
transaction is rolledback

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Wed Feb 10 11:34:35 2010
@@ -188,6 +188,7 @@
     private final Object ensureConnectionInfoSentMutex = new Object();
     private boolean useDedicatedTaskRunner;
     protected CountDownLatch transportInterruptionProcessingComplete;
+    private long consumerFailoverRedeliveryWaitPeriod;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -2244,7 +2245,7 @@
 	
 	protected void waitForTransportInterruptionProcessing() throws InterruptedException {
         if (transportInterruptionProcessingComplete != null) {
-            while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15,
TimeUnit.SECONDS)) {
+            while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(10,
TimeUnit.SECONDS)) {
                 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption
processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
             }
             synchronized (this) {
@@ -2258,4 +2259,35 @@
 	        transportInterruptionProcessingComplete.countDown();
 	    }
 	}
+
+    private void signalInterruptionProcessingComplete() throws InterruptedException {
+        if (transportInterruptionProcessingComplete.await(0, TimeUnit.SECONDS)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
+            }
+            synchronized (this) {
+                transportInterruptionProcessingComplete = null;
+                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
+                if (failoverTransport != null) {
+                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("notified failover transport (" + failoverTransport +")
of interruption completion for: " + this.getConnectionInfo().getConnectionId());
+                    }
+                } 
+            }
+        }
+    }
+
+    /*
+     * specify the amount of time in milliseconds that a consumer with a transaction pending
recovery
+     * will wait to receive re dispatched messages.
+     * default value is 0 so there is no wait by default.
+     */
+    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod)
{
+        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
+    }
+    
+    public long getConsumerFailoverRedeliveryWaitPeriod() {
+        return consumerFailoverRedeliveryWaitPeriod;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Wed Feb 10 11:34:35 2010
@@ -114,6 +114,7 @@
     private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
     private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
     private boolean useDedicatedTaskRunner;
+    private long consumerFailoverRedeliveryWaitPeriod = 0;
 
     // /////////////////////////////////////////////
     //
@@ -315,6 +316,7 @@
         connection.setAuditDepth(getAuditDepth());
         connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
         connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
+        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -913,4 +915,12 @@
     public boolean isUseDedicatedTaskRunner() {
         return useDedicatedTaskRunner;
     }
+    
+    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod)
{
+        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
+    }
+    
+    public long getConsumerFailoverRedeliveryWaitPeriod() {
+        return consumerFailoverRedeliveryWaitPeriod;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Wed Feb 10 11:34:35 2010
@@ -112,6 +112,7 @@
     // not been acknowledged. It's kept in reverse order since we
     // Always walk list in reverse order.
     private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
+    // track duplicate deliveries in a transaction such that the tx integrity can be validated
     private HashMap<MessageId, Boolean> previouslyDeliveredMessages;
     private int deliveredCounter;
     private int additionalWindowSize;
@@ -141,6 +142,8 @@
     
     private long optimizeAckTimestamp = System.currentTimeMillis();
     private long optimizeAckTimeout = 300;
+    private long failoverRedeliveryWaitPeriod = 0;
+    private boolean rollbackInitiated;
 
     /**
      * Create a MessageConsumer
@@ -228,7 +231,7 @@
         this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() &&
session.isAutoAcknowledge()
                                    && !info.isBrowser();
         this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
-
+        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
         if (messageListener != null) {
             setMessageListener(messageListener);
         }
@@ -948,6 +951,7 @@
      */
     public void acknowledge() throws JMSException {
         clearDispatchList();
+        waitForRedeliveries();
         synchronized(deliveredMessages) {
             // Acknowledge all messages so far.
             MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -972,51 +976,67 @@
         }
     }
     
+    private void waitForRedeliveries() {
+        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages !=
null) {
+            long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
+            int numberNotReplayed;
+            do {
+                numberNotReplayed = 0;
+                synchronized(deliveredMessages) {
+                    if (previouslyDeliveredMessages != null) { 
+                        for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
+                            if (!entry.getValue()) {
+                                numberNotReplayed++;
+                            }
+                        }
+                    }
+                }
+                if (numberNotReplayed > 0) {
+                    LOG.info("waiting for redelivery of " + numberNotReplayed + " to consumer
:" + this.getConsumerId());
+                    try {
+                        Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
+                    } catch (InterruptedException outOfhere) {
+                        break;
+                    }
+                }
+            } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
+        }
+    }
+
     /*
      * called with deliveredMessages locked
      */
     private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
         if (previouslyDeliveredMessages != null) {
+            if (rollbackInitiated) {
+                // second call from rollback, nothing more to do
+                // REVISIT - should beforeEnd be called again by transaction context?
+                rollbackInitiated = false;
+                return;
+            }
             // if any previously delivered messages was not re-delivered, transaction is
invalid and must rollback
             // as messages have been dispatched else where.
             int numberNotReplayed = 0;
             for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
                 if (!entry.getValue()) {
                     numberNotReplayed++;
-                    // allow outstanding messages to get delivered again
-                    removeFromDeliveredMessages(entry.getKey());
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("previously delivered message has not been replayed in
transaction, id: " + entry.getKey());
                     }
                 }
             }
-            clearPreviouslyDelivered();
             
             if (numberNotReplayed > 0) {
                 String message = "rolling back transaction post failover recovery. " + numberNotReplayed
                     + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
                 LOG.warn(message);
+                rollbackInitiated = true;
                 throw new TransactionRolledBackException(message);
             }
         }
         
     }
 
-    /*
-     * called with deliveredMessages locked
-     */
-    private void removeFromDeliveredMessages(MessageId key) {
-        ListIterator<MessageDispatch> iterator = deliveredMessages.listIterator(deliveredMessages.size());
-        while (iterator.hasPrevious()) {
-            MessageDispatch candidate = iterator.previous();
-            if (key.equals(candidate.getMessage().getMessageId())) {
-                session.connection.rollbackDuplicate(this, candidate.getMessage());
-                iterator.remove();
-                break;
-            }
-        }
-    }
-
     void acknowledge(MessageDispatch md) throws JMSException {
         MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
         session.sendAck(ack);
@@ -1049,7 +1069,7 @@
                 }
             }
             synchronized(deliveredMessages) {
-                clearPreviouslyDelivered();
+                rollbackPreviouslyDeliveredAndNotRedelivered();
                 if (deliveredMessages.isEmpty()) {
                     return;
                 }
@@ -1126,6 +1146,37 @@
     }
 
     /*
+     * called with unconsumedMessages && deliveredMessages locked
+     * remove any message not re-delivered as they can't be replayed to this 
+     * consumer on rollback
+     */
+    private void rollbackPreviouslyDeliveredAndNotRedelivered() {
+        if (previouslyDeliveredMessages != null) {
+            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
+                if (!entry.getValue()) {              
+                    removeFromDeliveredMessages(entry.getKey());
+                }
+            }
+            rollbackInitiated = false;
+            clearPreviouslyDelivered();
+        }
+    }
+
+    /*
+     * called with deliveredMessages locked
+     */
+    private void removeFromDeliveredMessages(MessageId key) {
+        ListIterator<MessageDispatch> iterator = deliveredMessages.listIterator(deliveredMessages.size());
+        while (iterator.hasPrevious()) {
+            MessageDispatch candidate = iterator.previous();
+            if (key.equals(candidate.getMessage().getMessageId())) {
+                session.connection.rollbackDuplicate(this, candidate.getMessage());
+                iterator.remove();
+                break;
+            }
+        }
+    }
+    /*
      * called with deliveredMessages locked
      */
     private void clearPreviouslyDelivered() {
@@ -1170,7 +1221,7 @@
                     } else {
                         if (!session.isTransacted()) {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate:
" + md.getMessage());
+                                LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate:
" + md.getMessage());
                             }
                             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
1);
                             session.sendAck(ack);
@@ -1178,12 +1229,24 @@
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(getConsumerId() + " tracking transacted redlivery
of duplicate: " + md.getMessage());
                             }
+                            boolean needsPoisonAck = false;
                             synchronized (deliveredMessages) {
                                 if (previouslyDeliveredMessages != null) {
                                     previouslyDeliveredMessages.put(md.getMessage().getMessageId(),
true);
+                                } else {
+                                    // existing transaction gone but still a duplicate!,
lets mark as poison ftm,
+                                    // possibly could allow redelivery..
+                                    needsPoisonAck = true;
                                 }
                             }
-                            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                            if (needsPoisonAck) {
+                                LOG.warn("acking as poison, duplicate transacted delivery
but no recovering transaction for: " + md);
+                                MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE,
1);
+                                poisonAck.setFirstMessageId(md.getMessage().getMessageId());
+                                session.sendAck(poisonAck);
+                            } else {
+                                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                            }
                         }
                     }
                 }
@@ -1197,7 +1260,7 @@
         }
     }
 
-    // async (on next call) clear delivered as they will be auto-acked as duplicates if they
arrive again
+    // async (on next call) clear or track delivered as they may be flagged as duplicates
if they arrive again
     private void clearDispatchList() {
         if (clearDispatchList) {
             synchronized (deliveredMessages) {  
@@ -1205,7 +1268,7 @@
                     if (!deliveredMessages.isEmpty()) {
                         if (session.isTransacted()) {    
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(getConsumerId() + " tracking delivered list ("
+ deliveredMessages.size() + ") on transport interrupt");
+                                LOG.debug(getConsumerId() + " tracking existing transacted
delivered list (" + deliveredMessages.size() + ") on transport interrupt");
                             }
                             if (previouslyDeliveredMessages == null) {
                                 previouslyDeliveredMessages = new HashMap<MessageId, Boolean>();

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Wed Feb 10 11:34:35 2010
@@ -293,11 +293,21 @@
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.COMMIT_ONE_PHASE);
             this.transactionId = null;
             // Notify the listener that the tx was committed back
-            syncSendPacketWithInterruptionHandling(info);
-            if (localTransactionEventListener != null) {
-                localTransactionEventListener.commitEvent();
+            try {
+                syncSendPacketWithInterruptionHandling(info);
+                if (localTransactionEventListener != null) {
+                    localTransactionEventListener.commitEvent();
+                }
+                afterCommit();
+            } catch (JMSException cause) {
+                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
+                if (localTransactionEventListener != null) {
+                    localTransactionEventListener.rollbackEvent();
+                }
+                afterRollback();
+                throw cause;
             }
-            afterCommit();
+            
         }
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Wed Feb 10 11:34:35 2010
@@ -23,12 +23,15 @@
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.TransactionRolledBackException;
+
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
@@ -140,22 +143,24 @@
     }
 
     private void restoreTransactions(Transport transport, ConnectionState connectionState)
throws IOException {
-        Vector<Command> toIgnore = new Vector<Command>();
+        Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
         for (TransactionState transactionState : connectionState.getTransactionStates())
{
             if (LOG.isDebugEnabled()) {
                 LOG.debug("tx: " + transactionState.getId());
             }
             
-            // ignore any empty (ack) transaction
-            if (transactionState.getCommands().size() == 2) {
-                Command lastCommand = transactionState.getCommands().get(1);
+            // rollback any completed transactions - no way to know if commit got there
+            // or if reply went missing
+            //
+            if (!transactionState.getCommands().isEmpty()) {
+                Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size()
- 1);
                 if (lastCommand instanceof TransactionInfo) {
                     TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
                     if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("not replaying empty (ack) tx: " + transactionState.getId());
+                            LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
                         }
-                        toIgnore.add(lastCommand);
+                        toRollback.add(transactionInfo);
                         continue;
                     }
                 }
@@ -184,9 +189,10 @@
             }
         }
         
-        for (Command command: toIgnore) {
+        for (TransactionInfo command: toRollback) {
             // respond to the outstanding commit
-            Response response = new Response();
+            ExceptionResponse response = new ExceptionResponse();
+            response.setException(new TransactionRolledBackException("Transaction completion
in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
             response.setCorrelationId(command.getCommandId());
             transport.getTransportListener().onCommand(response);
         }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Wed Feb 10 11:34:35 2010
@@ -212,7 +212,9 @@
                 failedConnectTransportURI=connectedTransportURI;
                 connectedTransportURI = null;
                 connected=false;
-            
+
+                stateTracker.transportInterrupted();
+
                 // notify before any reconnect attempt so ack state can be whacked
                 if (transportListener != null) {
                     transportListener.transportInterupted();

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
Wed Feb 10 11:34:35 2010
@@ -16,13 +16,16 @@
  */
 package org.apache.activemq.transport.failover;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -58,6 +61,7 @@
 	
     private static final Log LOG = LogFactory.getLog(FailoverConsumerOutstandingCommitTest.class);
 	private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
+    private static final String MESSAGE_TEXT = "Test message ";
 	private String url = "tcp://localhost:61616";
 	final int prefetch = 10;
 	BrokerService broker;
@@ -133,7 +137,7 @@
         connection.start();
         
         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final Queue destination = producerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch="
+ prefetch);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize="
+ prefetch);
         
         final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
@@ -167,13 +171,105 @@
           
         connection.close();
     }
+
+    @Test
+    public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
+        doTestFailoverConsumerOutstandingSendTx(false);
+    }
+	
+    @Test
+    public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
+        doTestFailoverConsumerOutstandingSendTx(true);
+    }
+    
+    public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit)
throws Exception {
+        final boolean watchTopicAdvisories = true;
+        broker = createBroker(true);
+
+        broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
+            @Override
+            public void commitTransaction(ConnectionContext context,
+                    TransactionId xid, boolean onePhase) throws Exception {
+                if (doActualBrokerCommit) {
+                    LOG.info("doing actual broker commit...");
+                    super.commitTransaction(context, xid, onePhase);
+                }
+                // so commit will hang as if reply is lost
+                context.setDontSendReponse(true);
+                Executors.newSingleThreadExecutor().execute(new Runnable() {
+                    public void run() {
+                        LOG.info("Stopping broker before commit...");
+                        try {
+                            broker.stop();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        } });
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        cf.setWatchTopicAdvisories(watchTopicAdvisories);
+        cf.setDispatchAsync(false);
+
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
+
+        final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME
+                + "?consumer.prefetchSize=" + prefetch);
+
+        final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+        final CountDownLatch messagesReceived = new CountDownLatch(3);
+        final AtomicBoolean gotCommitException = new AtomicBoolean(false);
+        final ArrayList<TextMessage> receivedMessages = new ArrayList<TextMessage>();
+        final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
+        testConsumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                LOG.info("consume one and commit: " + message);
+                assertNotNull("got message", message);
+                receivedMessages.add((TextMessage) message);
+                try {
+                    produceMessage(consumerSession, destination, 1);
+                    consumerSession.commit();
+                } catch (JMSException e) {
+                    LOG.info("commit exception", e);
+                    gotCommitException.set(true);
+                }
+                commitDoneLatch.countDown();
+                messagesReceived.countDown();
+                LOG.info("done commit");
+            }
+        });
+
+        produceMessage(producerSession, destination, prefetch * 2);
+
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false);
+        broker.start();
+
+        assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
+        assertTrue("commit failed", gotCommitException.get());
+        assertTrue("another message was received after failover", messagesReceived.await(20,
TimeUnit.SECONDS));
+        assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(0).getText());
+        // it was redelivered
+        assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText());
+        assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
+        assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
         
     @Test
     public void testRollbackFailoverConsumerTx() throws Exception {
         broker = createBroker(true);
         broker.start();
 
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        cf.setConsumerFailoverRedeliveryWaitPeriod(10000);
         final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
         connection.start();
 
@@ -228,7 +324,7 @@
         throws JMSException {
         MessageProducer producer = producerSession.createProducer(destination);
         for (int i=0; i<count; i++) {
-            TextMessage message = producerSession.createTextMessage("Test message " + i);
+            TextMessage message = producerSession.createTextMessage(MESSAGE_TEXT + i);
             producer.send(message);
         }
         producer.close();

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Wed Feb 10 11:34:35 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.transport.failover;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -40,6 +39,7 @@
 import javax.jms.TransactionRolledBackException;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -55,6 +55,7 @@
 import org.junit.Test;
 
 // see https://issues.apache.org/activemq/browse/AMQ-2473
+// https://issues.apache.org/activemq/browse/AMQ-2590
 public class FailoverTransactionTest {
 	
     private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
@@ -167,11 +168,12 @@
                 LOG.info("doing async commit...");
                 try {
                     session.commit();
-                    commitDoneLatch.countDown();
-                    LOG.info("done async commit");
-                } catch (Exception e) {
-                    e.printStackTrace();
+                } catch (JMSException e) {
+                    assertTrue(e instanceof TransactionRolledBackException);
+                    LOG.info("got commit exception: ", e);
                 }
+                commitDoneLatch.countDown();
+                LOG.info("done async commit");
             }
         });
        
@@ -285,110 +287,8 @@
 	    }
 	    session.commit();
 	    connection.close();
-	}  
-	
-	@Test
-	public void testFailoverConsumerCommitLost() throws Exception {
-	    final int adapter = 0;
-	    broker = createBroker(true);
-	    setPersistenceAdapter(adapter);
-
-	    broker.setPlugins(new BrokerPlugin[] {
-	            new BrokerPluginSupport() {
-
-	                @Override
-	                public void commitTransaction(ConnectionContext context,
-	                        TransactionId xid, boolean onePhase) throws Exception {
-	                    super.commitTransaction(context, xid, onePhase);
-	                    // so commit will hang as if reply is lost
-	                    context.setDontSendReponse(true);
-	                    Executors.newSingleThreadExecutor().execute(new Runnable() {   
-	                        public void run() {
-	                            LOG.info("Stopping broker post commit...");
-	                            try {
-	                                broker.stop();
-	                            } catch (Exception e) {
-	                                e.printStackTrace();
-	                            }
-	                        }
-	                    });
-	                }   
-	            }
-	    });
-	    broker.start();
-
-	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-	    Connection connection = cf.createConnection();
-	    connection.start();
-	    final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-	    final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-	    Queue destination = producerSession.createQueue(QUEUE_NAME);
-
-	    final MessageConsumer consumer = consumerSession.createConsumer(destination);
-
-	    produceMessage(producerSession, destination);
-
-	    final Vector<Message> receivedMessages = new Vector<Message>();
-	    final CountDownLatch commitDoneLatch = new CountDownLatch(1);  
-	    Executors.newSingleThreadExecutor().execute(new Runnable() {   
-	        public void run() {
-	            LOG.info("doing async commit after consume...");
-	            try {
-	                Message msg = consumer.receive(20000);
-	                LOG.info("Got message: " + msg);
-	                receivedMessages.add(msg);
-	                consumerSession.commit();
-	                commitDoneLatch.countDown();
-	                LOG.info("done async commit");
-	            } catch (Exception e) {
-	                e.printStackTrace();
-	            }
-	        }
-	    });
-
-
-	    // will be stopped by the plugin
-	    broker.waitUntilStopped();
-	    broker = createBroker(false);
-	    setPersistenceAdapter(adapter);
-	    broker.start();
-
-	    assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
-
-	    assertEquals("we got a message", 1, receivedMessages.size());
-
-	    // new transaction
-	    Message msg = consumer.receive(20000);
-	    LOG.info("Received: " + msg);
-	    assertNull("we did not get a duplicate message", msg);
-	    consumerSession.commit();
-	    consumer.close();
-	    connection.close();
-
-	    // ensure no dangling messages with fresh broker etc
-	    broker.stop();
-	    broker.waitUntilStopped();
-
-	    LOG.info("Checking for remaining/hung messages..");
-	    broker = createBroker(false);
-	    setPersistenceAdapter(adapter);
-	    broker.start();
-
-	    // after restart, ensure no dangling messages
-	    cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-	    connection = cf.createConnection();
-	    connection.start();
-	    Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-	    MessageConsumer consumer2 = session2.createConsumer(destination);
-	    msg = consumer2.receive(1000);
-	    if (msg == null) {
-	        msg = consumer2.receive(5000);
-	    }
-	    LOG.info("Received: " + msg);
-	    assertNull("no messges left dangling but got: " + msg, msg);
-	    connection.close();
 	}
-	
+		
     @Test
     public void testFailoverConsumerAckLost() throws Exception {
         // as failure depends on hash order, do a few times
@@ -563,6 +463,93 @@
         connection.close();
     }
 
+    @Test
+    public void testAutoRollbackWithMissingRedeliveries() throws Exception {
+        broker = createBroker(true);
+        broker.start();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+        final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+        
+        produceMessage(producerSession, destination);
+        
+        Message msg = consumer.receive(20000);
+        assertNotNull(msg);
+        
+        broker.stop();
+        broker = createBroker(false);
+        // use empty jdbc store so that default wait for redeliveries will timeout after
failover
+        setPersistenceAdapter(1);
+        broker.start();
+        
+        try {
+            consumerSession.commit();
+        } catch (JMSException expectedRolledback) {
+            assertTrue(expectedRolledback instanceof TransactionRolledBackException);
+        }
+        
+        broker.stop(); 
+        broker = createBroker(false);
+        broker.start();
+        
+        assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
+        connection.close();
+    }
+
+ 
+    @Test
+    public void testWaitForMissingRedeliveries() throws Exception {
+        LOG.info("testWaitForMissingRedeliveries()");
+        broker = createBroker(true);
+        broker.start();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
+        Connection connection = cf.createConnection();
+        connection.start();
+        final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME);
+        final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+        
+        produceMessage(producerSession, destination);
+        Message msg = consumer.receive(20000);
+        if (msg == null) {
+            AutoFailTestSupport.dumpAllThreads("missing-");
+        }
+        assertNotNull("got message just produced", msg);
+        
+        broker.stop();
+        broker = createBroker(false);
+        // use empty jdbc store so that wait for re-deliveries occur when failover resumes
+        setPersistenceAdapter(1);
+        broker.start();
+
+        final CountDownLatch commitDone = new CountDownLatch(1);
+        // will block pending re-deliveries
+        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+            public void run() {
+                LOG.info("doing async commit...");
+                try {
+                    consumerSession.commit();
+                    commitDone.countDown();
+                } catch (JMSException ignored) {
+                }
+            }
+        });
+        
+        broker.stop(); 
+        broker = createBroker(false);
+        broker.start();
+        
+        assertTrue("commit was successfull", commitDone.await(30, TimeUnit.SECONDS));
+        
+        assertNull("should not get committed message", consumer.receive(5000));
+        connection.close();
+    }
+
     private void produceMessage(final Session producerSession, Queue destination)
             throws JMSException {
         MessageProducer producer = producerSession.createProducer(destination);      



Mime
View raw message