Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5D0097BD5 for ; Fri, 30 Sep 2011 12:00:10 +0000 (UTC) Received: (qmail 51761 invoked by uid 500); 30 Sep 2011 12:00:10 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 51700 invoked by uid 500); 30 Sep 2011 12:00:09 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 51688 invoked by uid 99); 30 Sep 2011 12:00:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Sep 2011 12:00:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Sep 2011 12:00:06 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6D0CC23888CD for ; Fri, 30 Sep 2011 11:59:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1177619 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/bro... Date: Fri, 30 Sep 2011 11:59:45 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110930115946.6D0CC23888CD@eris.apache.org> Author: gtully Date: Fri Sep 30 11:59:44 2011 New Revision: 1177619 URL: http://svn.apache.org/viewvc?rev=1177619&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3519 - Allow JMSRedelivered flag to survive a restart. Add transactedIndividualAck flag to connection factory and rewriteOnRedelivery to KahaDBPersistenceAdapter. These combine to persist the redelivery status on a rollback Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Sep 30 11:59:44 2011 @@ -193,6 +193,7 @@ public class ActiveMQConnection implemen private long consumerFailoverRedeliveryWaitPeriod; private final Scheduler scheduler; private boolean messagePrioritySupported=true; + private boolean transactedIndividualAck = false; /** * Construct an ActiveMQConnection @@ -2399,6 +2400,15 @@ public class ActiveMQConnection implemen this.checkForDuplicates = checkForDuplicates; } + + public boolean isTransactedIndividualAck() { + return transactedIndividualAck; + } + + public void setTransactedIndividualAck(boolean transactedIndividualAck) { + this.transactedIndividualAck = transactedIndividualAck; + } + /** * Removes any TempDestinations that this connection has cached, ignoring * any exceptions generated because the destination is in use as they should Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Fri Sep 30 11:59:44 2011 @@ -119,6 +119,7 @@ public class ActiveMQConnectionFactory e private boolean checkForDuplicates = true; private ClientInternalExceptionListener clientInternalExceptionListener; private boolean messagePrioritySupported = true; + private boolean transactedIndividualAck = false; // ///////////////////////////////////////////// // @@ -325,6 +326,7 @@ public class ActiveMQConnectionFactory e connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); connection.setCheckForDuplicates(isCheckForDuplicates()); connection.setMessagePrioritySupported(isMessagePrioritySupported()); + connection.setTransactedIndividualAck(isTransactedIndividualAck()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -707,6 +709,8 @@ public class ActiveMQConnectionFactory e props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); + props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); + } public boolean isUseCompression() { @@ -1019,4 +1023,18 @@ public class ActiveMQConnectionFactory e public void setCheckForDuplicates(boolean checkForDuplicates) { this.checkForDuplicates = checkForDuplicates; } + + public boolean isTransactedIndividualAck() { + return transactedIndividualAck; + } + + /** + * when true, submit individual transacted acks immediately rather than with transaction completion. + * This allows the acks to represent delivery status which can be persisted on rollback + * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true + */ + public void setTransactedIndividualAck(boolean transactedIndividualAck) { + this.transactedIndividualAck = transactedIndividualAck; + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Sep 30 11:59:44 2011 @@ -152,6 +152,7 @@ public class ActiveMQMessageConsumer imp private long optimizeAckTimestamp = System.currentTimeMillis(); private long optimizeAcknowledgeTimeOut = 0; private long failoverRedeliveryWaitPeriod = 0; + private boolean transactedIndividualAck = false; /** * Create a MessageConsumer @@ -249,6 +250,7 @@ public class ActiveMQMessageConsumer imp } this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); + this.transactedIndividualAck = session.connection.isTransactedIndividualAck(); if (messageListener != null) { setMessageListener(messageListener); } @@ -678,7 +680,7 @@ public class ActiveMQMessageConsumer imp synchronized (unconsumedMessages.getMutex()) { if (inProgressClearRequiredFlag) { if (LOG.isDebugEnabled()) { - LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt"); + LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt"); } // ensure unconsumed are rolledback up front as they may get redelivered to another consumer List list = unconsumedMessages.removeAll(); @@ -833,11 +835,24 @@ public class ActiveMQMessageConsumer imp deliveredMessages.addFirst(md); } if (session.getTransacted()) { - ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + if (transactedIndividualAck) { + immediateIndividualTransactedAck(md); + } else { + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + } } } } - + + private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException { + // acks accumulate on the broker pending transaction completion to indicate + // delivery status + registerSync(); + MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); + ack.setTransactionId(session.getTransactionContext().getTransactionId()); + session.sendAck(ack); + } + private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; @@ -919,29 +934,7 @@ public class ActiveMQMessageConsumer imp // Don't acknowledge now, but we may need to let the broker know the // consumer got the message to expand the pre-fetch window if (session.getTransacted()) { - session.doStartTransaction(); - if (!synchronizationRegistered) { - synchronizationRegistered = true; - session.getTransactionContext().addSynchronization(new Synchronization() { - @Override - public void beforeEnd() throws Exception { - acknowledge(); - synchronizationRegistered = false; - } - - @Override - public void afterCommit() throws Exception { - commit(); - synchronizationRegistered = false; - } - - @Override - public void afterRollback() throws Exception { - rollback(); - synchronizationRegistered = false; - } - }); - } + registerSync(); } deliveredCounter++; @@ -976,6 +969,40 @@ public class ActiveMQMessageConsumer imp } } + private void registerSync() throws JMSException { + session.doStartTransaction(); + if (!synchronizationRegistered) { + synchronizationRegistered = true; + session.getTransactionContext().addSynchronization(new Synchronization() { + @Override + public void beforeEnd() throws Exception { + if (transactedIndividualAck) { + clearDispatchList(); + waitForRedeliveries(); + synchronized(deliveredMessages) { + rollbackOnFailedRecoveryRedelivery(); + } + } else { + acknowledge(); + } + synchronizationRegistered = false; + } + + @Override + public void afterCommit() throws Exception { + commit(); + synchronizationRegistered = false; + } + + @Override + public void afterRollback() throws Exception { + rollback(); + synchronizationRegistered = false; + } + }); + } + } + /** * Acknowledge all the messages that have been delivered to the client up to * this point. @@ -1284,7 +1311,11 @@ public class ActiveMQMessageConsumer imp poisonAck.setFirstMessageId(md.getMessage().getMessageId()); session.sendAck(poisonAck); } else { - ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + if (transactedIndividualAck) { + immediateIndividualTransactedAck(md); + } else { + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + } } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Sep 30 11:59:44 2011 @@ -593,17 +593,20 @@ public class BrokerService implements Se tempDataStore.stop(); tempDataStore = null; } - stopper.stop(persistenceAdapter); - persistenceAdapter = null; - slave = true; - if (isUseJmx()) { - stopper.stop(getManagementContext()); - managementContext = null; + try { + stopper.stop(persistenceAdapter); + persistenceAdapter = null; + slave = true; + if (isUseJmx()) { + stopper.stop(getManagementContext()); + managementContext = null; + } + // Clear SelectorParser cache to free memory + SelectorParser.clearCache(); + } finally { + stopped.set(true); + stoppedLatch.countDown(); } - // Clear SelectorParser cache to free memory - SelectorParser.clearCache(); - stopped.set(true); - stoppedLatch.countDown(); if (masterConnectorURI == null) { // master start has not finished yet if (slaveStartSignal.getCount() == 1) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Sep 30 11:59:44 2011 @@ -223,32 +223,7 @@ public abstract class PrefetchSubscripti node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); removeList.add(node); } else { - // setup a Synchronization to remove nodes from the - // dispatched list. - context.getTransaction().addSynchronization( - new Synchronization() { - - @Override - public void afterCommit() - throws Exception { - synchronized(dispatchLock) { - dequeueCounter++; - dispatched.remove(node); - node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); - } - } - - @Override - public void afterRollback() throws Exception { - synchronized(dispatchLock) { - if (isSlave()) { - node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); - } else { - // poisionAck will decrement - otherwise still inflight on client - } - } - } - }); + registerRemoveSync(context, node); } index++; acknowledge(context, ack, node); @@ -281,13 +256,17 @@ public abstract class PrefetchSubscripti for (final MessageReference node : dispatched) { MessageId messageId = node.getMessageId(); if (ack.getLastMessageId().equals(messageId)) { - // this should never be within a transaction - dequeueCounter++; - node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); - destination = node.getRegionDestination(); - acknowledge(context, ack, node); - dispatched.remove(node); + // Don't remove the nodes until we are committed - immediateAck option + if (!context.isInTransaction()) { + dequeueCounter++; + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + dispatched.remove(node); + } else { + registerRemoveSync(context, node); + } prefetchExtension = Math.max(0, prefetchExtension - 1); + acknowledge(context, ack, node); + destination = node.getRegionDestination(); callDispatchMatched = true; break; } @@ -406,6 +385,35 @@ public abstract class PrefetchSubscripti } } + private void registerRemoveSync(ConnectionContext context, final MessageReference node) { + // setup a Synchronization to remove nodes from the + // dispatched list. + context.getTransaction().addSynchronization( + new Synchronization() { + + @Override + public void afterCommit() + throws Exception { + synchronized(dispatchLock) { + dequeueCounter++; + dispatched.remove(node); + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + } + } + + @Override + public void afterRollback() throws Exception { + synchronized(dispatchLock) { + if (isSlave()) { + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + } else { + // poisionAck will decrement - otherwise still inflight on client + } + } + } + }); + } + /** * Checks an ack versus the contents of the dispatched list. * Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Fri Sep 30 11:59:44 2011 @@ -500,6 +500,18 @@ public class KahaDBPersistenceAdapter im letter.setForceRecoverIndex(forceRecoverIndex); } + /** + * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure + * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true + */ + public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) { + letter.setRewriteOnRedelivery(rewriteOnRedelivery); + } + + public boolean isRewriteOnRedelivery() { + return letter.isRewriteOnRedelivery(); + } + public KahaDBStore getStore() { return letter; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Sep 30 11:59:44 2011 @@ -63,6 +63,7 @@ import org.apache.activemq.usage.MemoryU import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; +import org.apache.kahadb.util.ByteSequence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.kahadb.journal.Location; @@ -244,6 +245,57 @@ public class KahaDBStore extends Message super.doStop(stopper); } + void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException { + Location location; + this.indexLock.writeLock().lock(); + try { + location = findMessageLocation(key, destination); + } finally { + this.indexLock.writeLock().unlock(); + } + + if (location != null) { + KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); + Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); + + message.incrementRedeliveryCounter(); + if (LOG.isTraceEnabled()) { + LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter()); + } + org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); + addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); + + final Location rewriteLocation = journal.write(toByteSequence(addMessage), true); + + this.indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(destination, tx); + Long sequence = sd.messageIdIndex.get(tx, key); + MessageKeys keys = sd.orderIndex.get(tx, sequence); + sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation)); + } + }); + } finally { + this.indexLock.writeLock().unlock(); + } + } + } + + private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { + return pageFile.tx().execute(new Transaction.CallableClosure() { + public Location execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(destination, tx); + Long sequence = sd.messageIdIndex.get(tx, key); + if (sequence == null) { + return null; + } + return sd.orderIndex.get(tx, sequence).location; + } + }); + } + protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { StoreQueueTask task = null; synchronized (store.asyncTaskMap) { @@ -390,16 +442,7 @@ public class KahaDBStore extends Message Location location; indexLock.readLock().lock(); try { - location = pageFile.tx().execute(new Transaction.CallableClosure() { - public Location execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - Long sequence = sd.messageIdIndex.get(tx, key); - if (sequence == null) { - return null; - } - return sd.orderIndex.get(tx, sequence).location; - } - }); + location = findMessageLocation(key, dest); }finally { indexLock.readLock().unlock(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Fri Sep 30 11:59:44 2011 @@ -407,7 +407,7 @@ public class KahaDBTransactionStore impl return message; } @Override - public Future run(ConnectionContext ctx) throws IOException { + public Future run(ConnectionContext ctx) throws IOException { return destination.asyncAddTopicMessage(ctx, message); } @@ -454,7 +454,7 @@ public class KahaDBTransactionStore impl if (ack.isInTransaction()) { if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { - destination.removeAsyncMessage(context, ack); + destination.removeMessage(context, ack); } else { Tx tx = getTx(ack.getTransactionId()); tx.add(new RemoveMessageCommand(context) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Sep 30 11:59:44 2011 @@ -94,7 +94,7 @@ import org.apache.kahadb.util.VariableMa import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { +public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { protected BrokerService brokerService; @@ -224,6 +224,7 @@ public class MessageDatabase extends Ser private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY; protected boolean forceRecoverIndex = false; private final Object checkpointThreadLock = new Object(); + private boolean rewriteOnRedelivery = false; public MessageDatabase() { } @@ -400,24 +401,27 @@ public class MessageDatabase extends Ser public void close() throws IOException, InterruptedException { if( opened.compareAndSet(true, false)) { - this.indexLock.writeLock().lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, true); - } - }); - pageFile.unload(); - metadata = new Metadata(); + this.indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, true); + } + }); + pageFile.unload(); + metadata = new Metadata(); + } finally { + this.indexLock.writeLock().unlock(); + } + journal.close(); + synchronized (checkpointThreadLock) { + checkpointThread.join(); + } } finally { - this.indexLock.writeLock().unlock(); + lockFile.unlock(); + lockFile=null; } - journal.close(); - synchronized (checkpointThreadLock) { - checkpointThread.join(); - } - lockFile.unlock(); - lockFile=null; } } @@ -804,6 +808,14 @@ public class MessageDatabase extends Ser return store(data, false, null,null); } + public ByteSequence toByteSequence(JournalCommand data) throws IOException { + int size = data.serializedSizeFramed(); + DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); + os.writeByte(data.type().getNumber()); + data.writeFramed(os); + return os.toByteSequence(); + } + /** * All updated are are funneled through this method. The updates are converted * to a JournalMessage which is logged to the journal and then the data from @@ -815,13 +827,9 @@ public class MessageDatabase extends Ser before.run(); } try { - int size = data.serializedSizeFramed(); - DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); - os.writeByte(data.type().getNumber()); - data.writeFramed(os); - + ByteSequence sequence = toByteSequence(data); long start = System.currentTimeMillis(); - Location location = journal.write(os.toByteSequence(), sync); + Location location = journal.write(sequence, sync); long start2 = System.currentTimeMillis(); process(data, location, after); long end = System.currentTimeMillis(); @@ -1079,16 +1087,35 @@ public class MessageDatabase extends Ser } } - protected void process(KahaRollbackCommand command, Location location) { + protected void process(KahaRollbackCommand command, Location location) throws IOException { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); + List updates = null; synchronized (inflightTransactions) { - List tx = inflightTransactions.remove(key); - if (tx == null) { - preparedTransactions.remove(key); + updates = inflightTransactions.remove(key); + if (updates == null) { + updates = preparedTransactions.remove(key); } } + if (isRewriteOnRedelivery()) { + persistRedeliveryCount(updates); + } } + private void persistRedeliveryCount(List updates) throws IOException { + if (updates != null) { + for (Operation operation : updates) { + operation.getCommand().visit(new Visitor() { + @Override + public void visit(KahaRemoveMessageCommand command) throws IOException { + incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination()); + } + }); + } + } + } + + abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException; + // ///////////////////////////////////////////////////////////////// // These methods do the actual index updates. // ///////////////////////////////////////////////////////////////// @@ -1981,10 +2008,12 @@ public class MessageDatabase extends Ser return TransactionIdConversion.convert(transactionInfo); } - abstract class Operation { + abstract class Operation > { + final T command; final Location location; - public Operation(Location location) { + public Operation(T command, Location location) { + this.command = command; this.location = location; } @@ -1992,15 +2021,17 @@ public class MessageDatabase extends Ser return location; } + public T getCommand() { + return command; + } + abstract public void execute(Transaction tx) throws IOException; } - class AddOpperation extends Operation { - final KahaAddMessageCommand command; + class AddOpperation extends Operation { public AddOpperation(KahaAddMessageCommand command, Location location) { - super(location); - this.command = command; + super(command, location); } @Override @@ -2008,27 +2039,18 @@ public class MessageDatabase extends Ser upadateIndex(tx, command, location); } - public KahaAddMessageCommand getCommand() { - return command; - } } - class RemoveOpperation extends Operation { - final KahaRemoveMessageCommand command; + class RemoveOpperation extends Operation { public RemoveOpperation(KahaRemoveMessageCommand command, Location location) { - super(location); - this.command = command; + super(command, location); } @Override public void execute(Transaction tx) throws IOException { updateIndex(tx, command, location); } - - public KahaRemoveMessageCommand getCommand() { - return command; - } } // ///////////////////////////////////////////////////////////////// @@ -2247,6 +2269,14 @@ public class MessageDatabase extends Ser this.databaseLockedWaitDelay = databaseLockedWaitDelay; } + public boolean isRewriteOnRedelivery() { + return rewriteOnRedelivery; + } + + public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) { + this.rewriteOnRedelivery = rewriteOnRedelivery; + } + // ///////////////////////////////////////////////////////////////// // Internal conversion methods. // ///////////////////////////////////////////////////////////////// Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java?rev=1177619&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java Fri Sep 30 11:59:44 2011 @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.transport.failover.FailoverTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedeliveryRestartTest extends BrokerRestartTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class); + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setRewriteOnRedelivery(true); + broker.addConnector("tcp://0.0.0.0:0"); + } + + public void testValidateRedeliveryFlagAfterRestart() throws Exception { + + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + ")?jms.immediateAck=true"); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + populateDestination(10, queueName, connection); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(queueName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = null; + for (int i=0; i<5;i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + session.rollback(); + consumer.close(); + + restartBroker(); + + // make failover aware of the restarted auto assigned port + ((FailoverTransport) connection.getTransport().narrow(FailoverTransport.class)).add(true, broker.getTransportConnectors().get(0).getPublishableConnectString()); + + consumer = session.createConsumer(destination); + for (int i=0; i<5;i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + } + session.commit(); + + // consume the rest that were not redeliveries + for (int i=0; i<5;i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + session.commit(); + + connection.close(); + } + + public void testValidateRedeliveryFlagAfterRecovery() throws Exception { + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.immediateAck=true"); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + populateDestination(1, queueName, connection); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(queueName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(20000); + LOG.info("got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + + // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover + kahaDBPersistenceAdapter.getStore().getJournal().close(); + broker.waitUntilStopped(); + + broker = createRestartedBroker(); + broker.start(); + + connectionFactory = + new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.immediateAck=true"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(destination); + msg = (TextMessage) consumer.receive(10000); + assertNotNull("got the message again", msg); + assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + + session.commit(); + connection.close(); + } + + private void populateDestination(final int nbMessages, + final String destinationName, javax.jms.Connection connection) + throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(destination); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("")); + } + producer.close(); + session.close(); + } + + + public static Test suite() { + return suite(RedeliveryRestartTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java Fri Sep 30 11:59:44 2011 @@ -63,7 +63,6 @@ public class AMQ2736Test { store.close(); } catch (Exception expectedLotsAsJournalBorked) { } - store.getLockFile().unlock(); broker.stop(); broker.waitUntilStopped(); Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java?rev=1177619&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java Fri Sep 30 11:59:44 2011 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import java.io.IOException; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; + +public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest { + + public static Test suite() { + return suite(FailoverRedeliveryTransactionTest.class); + } + + @Override + public void configureConnectionFactory(ActiveMQConnectionFactory factory) { + super.configureConnectionFactory(factory); + factory.setImmediateAck(true); + } + + @Override + public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress); + configurePersistenceAdapter(brokerService); + return brokerService; + } + + private void configurePersistenceAdapter(BrokerService brokerService) throws IOException { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setRewriteOnRedelivery(true); + } + + @Override + public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { + PersistenceAdapter persistenceAdapter = super.setDefaultPersistenceAdapter(broker); + configurePersistenceAdapter(broker); + return persistenceAdapter; + } + + // no point rerunning these + @Override + public void testFailoverProducerCloseBeforeTransaction() throws Exception { + } + + @Override + public void initCombosForTestFailoverCommitReplyLost() { + } + + @Override + public void testFailoverCommitReplyLost() throws Exception { + } + + @Override + public void initCombosForTestFailoverSendReplyLost() { + } + + @Override + public void testFailoverSendReplyLost() throws Exception { + } + + @Override + public void initCombosForTestFailoverConnectionSendReplyLost() { + } + + @Override + public void testFailoverConnectionSendReplyLost() throws Exception { + } + + @Override + public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { + } + + @Override + public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception { + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=1177619&r1=1177618&r2=1177619&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Fri Sep 30 11:59:44 2011 @@ -17,6 +17,7 @@ package org.apache.activemq.transport.failover; import junit.framework.Test; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.TestSupport; @@ -114,9 +115,14 @@ public class FailoverTransactionTest ext return broker; } + public void configureConnectionFactory(ActiveMQConnectionFactory factory) { + // nothing to do + } + public void testFailoverProducerCloseBeforeTransaction() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -170,6 +176,7 @@ public class FailoverTransactionTest ext broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -222,6 +229,7 @@ public class FailoverTransactionTest ext // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); connection = cf.createConnection(); connection.start(); Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -274,6 +282,7 @@ public class FailoverTransactionTest ext broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -329,6 +338,7 @@ public class FailoverTransactionTest ext // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); connection = cf.createConnection(); connection.start(); Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -400,6 +410,7 @@ public class FailoverTransactionTest ext proxy.open(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -450,6 +461,7 @@ public class FailoverTransactionTest ext // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); connection = cf.createConnection(); connection.start(); Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -466,6 +478,7 @@ public class FailoverTransactionTest ext public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -489,6 +502,7 @@ public class FailoverTransactionTest ext public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -521,6 +535,7 @@ public class FailoverTransactionTest ext public void testFailoverWithConnectionConsumer() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); @@ -573,6 +588,7 @@ public class FailoverTransactionTest ext // as failure depends on hash order of state tracker recovery, do a few times for (int i = 0; i < 3; i++) { try { + LOG.info("Iteration: " + i); doTestFailoverConsumerAckLost(i); } finally { stopBroker(); @@ -612,6 +628,7 @@ public class FailoverTransactionTest ext Vector connections = new Vector(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); connections.add(connection); @@ -728,6 +745,7 @@ public class FailoverTransactionTest ext // after restart, ensure no dangling messages cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); connection = cf.createConnection(); connection.start(); Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -745,6 +763,7 @@ public class FailoverTransactionTest ext broker = createBroker(true); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -782,6 +801,7 @@ public class FailoverTransactionTest ext broker = createBroker(true); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -830,6 +850,7 @@ public class FailoverTransactionTest ext broker = createBroker(true); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000"); + configureConnectionFactory(cf); Connection connection = cf.createConnection(); connection.start(); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);