activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r967134 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ main/java/org/apache/activemq/store/kahadaptor/ main/java/o...
Date Fri, 23 Jul 2010 15:31:11 GMT
Author: gtully
Date: Fri Jul 23 15:31:10 2010
New Revision: 967134

URL: http://svn.apache.org/viewvc?rev=967134&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2580 - patch was good for AMQ store, fix needed for kahaDB and JDBC now done, test case was great, thanks.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Fri Jul 23 15:31:10 2010
@@ -46,10 +46,11 @@ final class RecoveryListenerAdapter impl
     
     public boolean recoverMessage(Message message) throws Exception {
         if (listener.hasSpace()) {
-            listener.recoverMessage(message);
-            lastRecovered = message.getMessageId();
-            count++;
-            return true;
+            if (listener.recoverMessage(message)) {
+                lastRecovered = message.getMessageId();
+                count++;
+                return true;
+            }
         }
         return false;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Jul 23 15:31:10 2010
@@ -108,10 +108,11 @@ public class JDBCTopicMessageStore exten
                     if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
-                        listener.recoverMessage(msg);
-                        finalLast.set(sequenceId);
-                        finalPriority.set(msg.getPriority());
-                        return true;
+                        if (listener.recoverMessage(msg)) {
+                            finalLast.set(sequenceId);
+                            finalPriority.set(msg.getPriority());
+                            return true;
+                        }
                     }
                     return false;
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Jul 23 15:31:10 2010
@@ -451,7 +451,7 @@ public class DefaultJDBCAdapter implemen
             } else {
                 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
             }
-            s.setMaxRows(maxReturned);
+            // no set max rows as selectors may need to scan more than maxReturned messages to get what they need
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -466,16 +466,12 @@ public class DefaultJDBCAdapter implemen
                 while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
-                    } else {
-                        break;
                     }
                 }
             } else {
                 while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                         count++;
-                    } else {
-                        break;
                     }
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Fri Jul 23 15:31:10 2010
@@ -305,8 +305,6 @@ public class KahaTopicReferenceStore ext
                             if (recoverReference(listener, msg)) {
                                 count++;
                                 container.setBatchEntry(msg.getMessageId(), entry);
-                            } else {
-                                break;
                             }
                         } else {
                             container.reset();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Jul 23 15:31:10 2010
@@ -798,8 +798,9 @@ public class KahaDBStore extends Message
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
                                 .hasNext();) {
                             entry = iterator.next();
-                            listener.recoverMessage(loadMessage(entry.getValue().location));
-                            counter++;
+                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
+                                counter++;
+                            }
                             if (counter >= maxReturned || listener.hasSpace() == false) {
                                 break;
                             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java Fri Jul 23 15:31:10 2010
@@ -47,14 +47,6 @@ public class JmsTopicSelectorTest extend
     protected boolean durable;
     protected int deliveryMode = DeliveryMode.PERSISTENT;
 
-    public JmsTopicSelectorTest() {
-        super();
-    }
-
-    public JmsTopicSelectorTest(String name) {
-        super(name);
-    }
-
     public void setUp() throws Exception {
         super.setUp();
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java Fri Jul 23 15:31:10 2010
@@ -17,6 +17,8 @@
 package org.apache.activemq;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Enumeration;
 import java.util.Map;
 
 import javax.jms.Connection;
@@ -34,6 +36,10 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -42,18 +48,11 @@ import org.apache.commons.logging.LogFac
  * 
  * @version $Revision: 1.5 $
  */
-public class TestSupport extends TestCase {
+public abstract class TestSupport extends CombinationTestSupport {
 
     protected ActiveMQConnectionFactory connectionFactory;
     protected boolean topic = true;
-
-    public TestSupport() {
-        super();
-    }
-
-    public TestSupport(String name) {
-        super(name);
-    }
+    public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB;
 
     protected ActiveMQMessage createMessage() {
         return new ActiveMQMessage();
@@ -173,4 +172,28 @@ public class TestSupport extends TestCas
                     regionBroker.getQueueRegion().getDestinationMap() :
                         regionBroker.getTopicRegion().getDestinationMap();
     }
+
+    public static enum PersistenceAdapterChoice {KahaDB, AMQ, JDBC };
+
+    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
+        return setPersistenceAdapter(broker, defaultPersistenceAdapter);
+    }
+    
+    public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException {
+        PersistenceAdapter adapter = null;
+        switch (choice) {
+        case AMQ:
+            adapter = new AMQPersistenceAdapter();
+            break;
+        case JDBC:
+            adapter = new JDBCPersistenceAdapter();
+            break;
+        case KahaDB:
+            adapter = new KahaDBPersistenceAdapter();
+            break;
+        }
+        broker.setPersistenceAdapter(adapter);
+        return adapter;
+    }
+
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java?rev=967134&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java Fri Jul 23 15:31:10 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;      
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+public class AMQ2580Test extends TestSupport {
+
+    private static final Log LOG = LogFactory.getLog(AMQ2580Test.class);
+
+    private static final String TOPIC_NAME = "topicName";
+    private static final String CLIENT_ID = "client_id";
+    private static final String textOfSelectedMsg = "good_message";
+
+    protected TopicConnection connection;
+
+    private Topic topic;
+    private Session session;
+    private MessageProducer producer;
+    private ConnectionFactory connectionFactory;
+    private TopicConnection topicConnection;
+    private BrokerService service;
+
+    public static Test suite() {
+        return suite(AMQ2580Test.class);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        initDurableBroker();
+        initConnectionFactory();
+        initTopic();
+    }
+
+    protected void tearDown() throws Exception {
+        shutdownClient();
+        service.stop();
+        super.tearDown();
+    }
+
+    private void initConnection() throws JMSException {
+        if (connection == null) {
+            LOG.info("Initializing connection");
+
+            connection = (TopicConnection) connectionFactory.createConnection();
+            connection.start();
+        }
+    }
+
+    public void initCombosForTestTopicIsDurableSmokeTest() throws Exception {
+        addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
+    }
+
+    public void testTopicIsDurableSmokeTest() throws Exception {
+
+        initClient();
+        MessageConsumer consumer = createMessageConsumer();
+        LOG.info("Consuming message");
+        assertNull(consumer.receive(1));
+        shutdownClient();
+        consumer.close();
+
+        sendMessages();
+        shutdownClient();
+
+        initClient();
+        consumer = createMessageConsumer();
+
+        LOG.info("Consuming message");
+        TextMessage answer1 = (TextMessage) consumer.receive(1000);
+        assertNotNull("we got our message", answer1);
+
+        consumer.close();
+    }
+
+    private MessageConsumer createMessageConsumer() throws JMSException {
+        LOG.info("creating durable subscriber");
+        return session.createDurableSubscriber(topic,
+                TOPIC_NAME,
+                "name='value'",
+                false);
+    }
+
+    private void initClient() throws JMSException {
+        LOG.info("Initializing client");
+
+        initConnection();
+        initSession();
+    }
+
+    private void shutdownClient()
+            throws JMSException {
+        LOG.info("Closing session and connection");
+        session.close();
+        connection.close();
+        session = null;
+        connection = null;
+    }
+
+    private void sendMessages()
+            throws JMSException {
+        initConnection();
+
+        initSession();
+
+        LOG.info("Creating producer");
+        producer = session.createProducer(topic);
+
+        sendMessageThatFailsSelection();
+
+        sendMessage(textOfSelectedMsg, "value");
+    }
+
+    private void initSession() throws JMSException {
+        LOG.info("Initializing session");
+        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    private void sendMessageThatFailsSelection() throws JMSException {
+        for (int i = 0; i < 5; i++) {
+            String textOfNotSelectedMsg = "Msg_" + i;
+            sendMessage(textOfNotSelectedMsg, "not_value");
+            LOG.info("#");
+        }
+    }
+
+    private void sendMessage(
+            String msgText,
+            String propertyValue) throws JMSException {
+        LOG.info("Creating message: " + msgText);
+        TextMessage messageToSelect = session.createTextMessage(msgText);
+        messageToSelect.setStringProperty("name", propertyValue);
+        LOG.info("Sending message");
+        producer.send(messageToSelect);
+    }
+
+    protected void initConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
+        connectionFactory = activeMqConnectionFactory;
+    }
+
+
+    private ActiveMQConnectionFactory createActiveMqConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(
+                "failover:" + service.getTransportConnectors().get(0).getConnectUri().toString());
+        activeMqConnectionFactory.setWatchTopicAdvisories(false);
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setDurableTopicPrefetch(2);
+        prefetchPolicy.setOptimizeDurableTopicPrefetch(2);
+        activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy);
+        activeMqConnectionFactory.setClientID(CLIENT_ID);
+        return activeMqConnectionFactory;
+    }
+
+    private void initDurableBroker() throws Exception {
+        service = new BrokerService();
+        setDefaultPersistenceAdapter(service);
+        service.setDeleteAllMessagesOnStartup(true);
+        service.setAdvisorySupport(false);
+        service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"});
+        service.setPersistent(true);
+        service.setUseJmx(false);
+        service.start();
+
+    }
+
+    private void initTopic() throws JMSException {
+        topicConnection = (TopicConnection) connectionFactory.createConnection();
+        TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        topic = topicSession.createTopic(TOPIC_NAME);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=967134&r1=967133&r2=967134&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Fri Jul 23 15:31:10 2010
@@ -16,35 +16,10 @@
  */
 package org.apache.activemq.transport.failover;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.ServerSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TransactionRolledBackException;
-
+import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.AutoFailTestSupport;
+import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -54,100 +29,111 @@ import org.apache.activemq.broker.Produc
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.SocketProxy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
+import java.net.URI;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 // see https://issues.apache.org/activemq/browse/AMQ-2473
+
 // https://issues.apache.org/activemq/browse/AMQ-2590
-public class FailoverTransactionTest {
-	
+public class FailoverTransactionTest extends TestSupport {
+
     private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
-	private static final String QUEUE_NAME = "FailoverWithTx";
-	private String url = "tcp://localhost:61616";
-	BrokerService broker;
-	
-	public void startCleanBroker() throws Exception {
-	    startBroker(true);
-	}
-	
-	@After
-	public void stopBroker() throws Exception {
-	    if (broker != null) {
-	        broker.stop();
-	    }
-	}
-	
-	public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-	    broker = createBroker(deleteAllMessagesOnStartup);
-        broker.start();
-	}
-
-	public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {   
-	    broker = new BrokerService();
-	    broker.setUseJmx(false);
-	    broker.setAdvisorySupport(false);
-	    broker.addConnector(url);
-	    broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-	    return broker;
-	}
-
-	@Test
-	public void testFailoverProducerCloseBeforeTransaction() throws Exception {
-	    startCleanBroker();
-		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-		Connection connection = cf.createConnection();
-		connection.start();
-		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-		Queue destination = session.createQueue(QUEUE_NAME);
+    private static final String QUEUE_NAME = "FailoverWithTx";
+    private String url = "tcp://localhost:61616";
+    BrokerService broker;
+
+    public static Test suite() {
+        return suite(FailoverTransactionTest.class);
+    }
+
+    public void startCleanBroker() throws Exception {
+        startBroker(true);
+    }
+
+    public void tearDown() throws Exception {
+        stopBroker();
+    }
+
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+        broker = createBroker(deleteAllMessagesOnStartup);
+        broker.start();
+    }
+
+    public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.addConnector(url);
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
+        return broker;
+    }
+
+    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
+        startCleanBroker();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(QUEUE_NAME);
 
         MessageConsumer consumer = session.createConsumer(destination);
-		produceMessage(session, destination);
-		
-		// restart to force failover and connection state recovery before the commit
-		broker.stop();
-		startBroker(false);
-
-		session.commit();
-		assertNotNull("we got the message", consumer.receive(20000));
-		session.commit();	
-		connection.close();
-	}
-	
-    @Test
-    public void testFailoverCommitReplyLostAMQ() throws Exception {
-        doTestFailoverCommitReplyLost(0);
-    }  
-    
-    @Test
-    public void testFailoverCommitReplyLostJdbc() throws Exception {
-        doTestFailoverCommitReplyLost(1);
-    }
-    
-    @Test
-    public void testFailoverCommitReplyLostKahaDB() throws Exception {
-        doTestFailoverCommitReplyLost(2);
-    }
-    
-    public void doTestFailoverCommitReplyLost(final int adapter) throws Exception {
-        
+        produceMessage(session, destination);
+
+        // restart to force failover and connection state recovery before the commit
+        broker.stop();
+        startBroker(false);
+
+        session.commit();
+        assertNotNull("we got the message", consumer.receive(20000));
+        session.commit();
+        connection.close();
+    }
+
+    public void initCombosForTestFailoverCommitReplyLost() {
+        addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
+    }
+
+    public void testFailoverCommitReplyLost() throws Exception {
+
         broker = createBroker(true);
-        setPersistenceAdapter(adapter);
-            
-        broker.setPlugins(new BrokerPlugin[] {
+        setDefaultPersistenceAdapter(broker);
+
+        broker.setPlugins(new BrokerPlugin[]{
                 new BrokerPluginSupport() {
                     @Override
                     public void commitTransaction(ConnectionContext context,
-                            TransactionId xid, boolean onePhase) throws Exception {
+                                                  TransactionId xid, boolean onePhase) throws Exception {
                         super.commitTransaction(context, xid, onePhase);
                         // so commit will hang as if reply is lost
                         context.setDontSendReponse(true);
-                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {
                             public void run() {
                                 LOG.info("Stopping broker post commit...");
                                 try {
@@ -157,11 +143,11 @@ public class FailoverTransactionTest {
                                 }
                             }
                         });
-                   }   
+                    }
                 }
         });
         broker.start();
-        
+
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
         Connection connection = cf.createConnection();
         connection.start();
@@ -170,10 +156,10 @@ public class FailoverTransactionTest {
 
         MessageConsumer consumer = session.createConsumer(destination);
         produceMessage(session, destination);
-        
+
         final CountDownLatch commitDoneLatch = new CountDownLatch(1);
         // broker will die on commit reply so this will hang till restart
-        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit...");
                 try {
@@ -186,15 +172,15 @@ public class FailoverTransactionTest {
                 LOG.info("done async commit");
             }
         });
-       
+
         // will be stopped by the plugin
         broker.waitUntilStopped();
         broker = createBroker(false);
-        setPersistenceAdapter(adapter);
+        setDefaultPersistenceAdapter(broker);
         broker.start();
 
         assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
-        
+
         // new transaction
         Message msg = consumer.receive(20000);
         LOG.info("Received: " + msg);
@@ -203,16 +189,16 @@ public class FailoverTransactionTest {
         session.commit();
         consumer.close();
         connection.close();
-        
+
         // ensure no dangling messages with fresh broker etc
         broker.stop();
         broker.waitUntilStopped();
-        
+
         LOG.info("Checking for remaining/hung messages..");
         broker = createBroker(false);
-        setPersistenceAdapter(adapter);
+        setDefaultPersistenceAdapter(broker);
         broker.start();
-        
+
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
         connection = cf.createConnection();
@@ -228,37 +214,30 @@ public class FailoverTransactionTest {
         connection.close();
     }
 
-    
-    //@Test not implemented
-    public void testFailoverSendReplyLostAMQ() throws Exception {
-        doTestFailoverSendReplyLost(0);
-    }  
-    
-    @Test
-    public void testFailoverSendReplyLostJdbc() throws Exception {
-        doTestFailoverSendReplyLost(1);
-    }
-    
-    @Test
-    public void testFailoverSendReplyLostKahaDB() throws Exception {
-        doTestFailoverSendReplyLost(2);
-    }
-    
-    public void doTestFailoverSendReplyLost(final int adapter) throws Exception {
-        
+
+    public void initCombosForTestFailoverSendReplyLost() {
+        addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{PersistenceAdapterChoice.KahaDB,
+                        PersistenceAdapterChoice.JDBC
+                        // not implemented for AMQ store
+                });
+    }
+
+    public void testFailoverSendReplyLost() throws Exception {
+
         broker = createBroker(true);
-        setPersistenceAdapter(adapter);
-            
-        broker.setPlugins(new BrokerPlugin[] {
+        setDefaultPersistenceAdapter(broker);
+
+        broker.setPlugins(new BrokerPlugin[]{
                 new BrokerPluginSupport() {
                     @Override
                     public void send(ProducerBrokerExchange producerExchange,
-                            org.apache.activemq.command.Message messageSend)
+                                     org.apache.activemq.command.Message messageSend)
                             throws Exception {
                         // so send will hang as if reply is lost
                         super.send(producerExchange, messageSend);
                         producerExchange.getConnectionContext().setDontSendReponse(true);
-                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {
                             public void run() {
                                 LOG.info("Stopping broker post send...");
                                 try {
@@ -272,7 +251,7 @@ public class FailoverTransactionTest {
                 }
         });
         broker.start();
-        
+
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
         Connection connection = cf.createConnection();
         connection.start();
@@ -282,7 +261,7 @@ public class FailoverTransactionTest {
         MessageConsumer consumer = session.createConsumer(destination);
         final CountDownLatch sendDoneLatch = new CountDownLatch(1);
         // broker will die on send reply so this will hang till restart
-        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async send...");
                 try {
@@ -296,16 +275,16 @@ public class FailoverTransactionTest {
                 LOG.info("done async send");
             }
         });
-       
+
         // will be stopped by the plugin
         broker.waitUntilStopped();
         broker = createBroker(false);
-        setPersistenceAdapter(adapter);
+        setDefaultPersistenceAdapter(broker);
         LOG.info("restarting....");
         broker.start();
 
         assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
-        
+
         // new transaction
         Message msg = consumer.receive(20000);
         LOG.info("Received: " + msg);
@@ -313,20 +292,20 @@ public class FailoverTransactionTest {
         assertNull("we got just one message", consumer.receive(2000));
         consumer.close();
         connection.close();
-        
+
         // verify stats
-        assertEquals("no newly queued messages", 0, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-        assertEquals("1 dequeue", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
-        
+        assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
+
         // ensure no dangling messages with fresh broker etc
         broker.stop();
         broker.waitUntilStopped();
-        
+
         LOG.info("Checking for remaining/hung messages with second restart..");
         broker = createBroker(false);
-        setPersistenceAdapter(adapter);
+        setDefaultPersistenceAdapter(broker);
         broker.start();
-        
+
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
         connection = cf.createConnection();
@@ -342,43 +321,45 @@ public class FailoverTransactionTest {
         connection.close();
     }
 
-    // not implemented.. @Test
-    public void testFailoverConnectionSendReplyLostAMQ() throws Exception {
-        doTestFailoverConnectionSendReplyLost(0);
-    }  
-    
-    @Test
-    public void testFailoverConnectionSendReplyLostJdbc() throws Exception {
-        doTestFailoverConnectionSendReplyLost(1);
-    }
-    
-    @Test
-    public void testFailoverConnectionSendReplyLostKahaDB() throws Exception {
-        doTestFailoverConnectionSendReplyLost(2);
-    }
-    
-    public void doTestFailoverConnectionSendReplyLost(final int adapter) throws Exception {
-        
+
+    public void initCombosForTestFailoverConnectionSendReplyLost() {
+        addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{PersistenceAdapterChoice.KahaDB,
+                        PersistenceAdapterChoice.JDBC
+                        // last producer message id store feature not implemented for AMQ store
+                });
+    }
+
+    public void testFailoverConnectionSendReplyLost() throws Exception {
+
         broker = createBroker(true);
-        setPersistenceAdapter(adapter);
-        
+        PersistenceAdapter store = setDefaultPersistenceAdapter(broker);
+        if (store instanceof KahaDBPersistenceAdapter) {
+            // duplicate checker not updated on canceled tasks, even it
+            // it was, recovery of the audit would fail as the message is
+            // not recorded in the store and the audit may not be up to date.
+            // So if duplicate messages are a absolute no no after restarts,
+            // ConcurrentStoreAndDispatchQueues must be disabled
+            ((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false);
+        }
+
         final SocketProxy proxy = new SocketProxy();
 
-        broker.setPlugins(new BrokerPlugin[] {
+        broker.setPlugins(new BrokerPlugin[]{
                 new BrokerPluginSupport() {
                     private boolean firstSend = true;
 
                     @Override
                     public void send(ProducerBrokerExchange producerExchange,
-                            org.apache.activemq.command.Message messageSend)
+                                     org.apache.activemq.command.Message messageSend)
                             throws Exception {
                         // so send will hang as if reply is lost
                         super.send(producerExchange, messageSend);
                         if (firstSend) {
                             firstSend = false;
-                        
+
                             producerExchange.getConnectionContext().setDontSendReponse(true);
-                            Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
                                 public void run() {
                                     LOG.info("Stopping connection post send...");
                                     try {
@@ -386,17 +367,17 @@ public class FailoverTransactionTest {
                                     } catch (Exception e) {
                                         e.printStackTrace();
                                     }
-                                }   
+                                }
                             });
                         }
                     }
                 }
         });
         broker.start();
-        
+
         proxy.setTarget(new URI(url));
         proxy.open();
-        
+
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
         Connection connection = cf.createConnection();
         connection.start();
@@ -406,7 +387,7 @@ public class FailoverTransactionTest {
         MessageConsumer consumer = session.createConsumer(destination);
         final CountDownLatch sendDoneLatch = new CountDownLatch(1);
         // proxy connection will die on send reply so this will hang on failover reconnect till open
-        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async send...");
                 try {
@@ -419,33 +400,33 @@ public class FailoverTransactionTest {
                 LOG.info("done async send");
             }
         });
-       
+
         // will be closed by the plugin
         assertTrue("proxy was closed", proxy.waitUntilClosed(30));
         LOG.info("restarting proxy");
         proxy.open();
 
         assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
-        
+
         Message msg = consumer.receive(20000);
         LOG.info("Received: " + msg);
         assertNotNull("we got the message", msg);
         assertNull("we got just one message", consumer.receive(2000));
         consumer.close();
         connection.close();
-        
+
         // verify stats, connection dup suppression means dups don't get to broker
-        assertEquals("one queued message", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-        
+        assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+
         // ensure no dangling messages with fresh broker etc
         broker.stop();
         broker.waitUntilStopped();
-        
+
         LOG.info("Checking for remaining/hung messages with restart..");
         broker = createBroker(false);
-        setPersistenceAdapter(adapter);
+        setDefaultPersistenceAdapter(broker);
         broker.start();
-        
+
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
         connection = cf.createConnection();
@@ -460,95 +441,68 @@ public class FailoverTransactionTest {
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();
     }
-    
-    
-    
-    private void setPersistenceAdapter(int adapter) throws IOException {
-        switch (adapter) {
-        case 0:
-            broker.setPersistenceAdapter(new AMQPersistenceAdapter());
-            break;
-        case 1:
-            broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
-            break;
-        case 2:
-            KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
-            // duplicate checker not updated on canceled tasks, even it
-            // it was, reovery of the audit would fail as the message is
-            // not recorded in the store and the audit may not be up to date.
-            // So if duplicate are a nono (w.r.t stats), this must be disabled
-            store.setConcurrentStoreAndDispatchQueues(false);
-            store.setMaxFailoverProducersToTrack(10);
-            store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
-            broker.setPersistenceAdapter(store);
-            break;
-        }
-    }
-
-	@Test
-	public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
-	    startCleanBroker();        
-	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
-	    Connection connection = cf.createConnection();
-	    connection.start();
-	    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-	    Queue destination = session.createQueue(QUEUE_NAME);
-	    
-	    MessageConsumer consumer = session.createConsumer(destination);
-	    produceMessage(session, destination);
-	    
-	    // restart to force failover and connection state recovery before the commit
-	    broker.stop();
-	    startBroker(false);
-	    
-	    session.commit();
-	    
-	    // without tracking producers, message will not be replayed on recovery
-	    assertNull("we got the message", consumer.receive(5000));
-	    session.commit();   
-	    connection.close();
-	}
-	
-	@Test
-	public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
-	    startCleanBroker();	        
-	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-	    Connection connection = cf.createConnection();
-	    connection.start();
-	    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-	    Queue destination = session.createQueue(QUEUE_NAME);
-	    
-	    MessageConsumer consumer = session.createConsumer(destination);
-	    MessageProducer producer;
-	    TextMessage message;
-	    final int count = 10;
-	    for (int i=0; i<count; i++) {
-	        producer = session.createProducer(destination);	        
-	        message = session.createTextMessage("Test message: " + count);
-	        producer.send(message);
-	        producer.close();
-	    }
-	    
-	    // restart to force failover and connection state recovery before the commit
-	    broker.stop();
-	    startBroker(false);
-	    
-	    session.commit();
-	    for (int i=0; i<count; i++) {
-	        assertNotNull("we got all the message: " + count, consumer.receive(20000));
-	    }
-	    session.commit();
-	    connection.close();
-	}
-	
-    @Test
+
+    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+        startCleanBroker();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        produceMessage(session, destination);
+
+        // restart to force failover and connection state recovery before the commit
+        broker.stop();
+        startBroker(false);
+
+        session.commit();
+
+        // without tracking producers, message will not be replayed on recovery
+        assertNull("we got the message", consumer.receive(5000));
+        session.commit();
+        connection.close();
+    }
+
+    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
+        startCleanBroker();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer;
+        TextMessage message;
+        final int count = 10;
+        for (int i = 0; i < count; i++) {
+            producer = session.createProducer(destination);
+            message = session.createTextMessage("Test message: " + count);
+            producer.send(message);
+            producer.close();
+        }
+
+        // restart to force failover and connection state recovery before the commit
+        broker.stop();
+        startBroker(false);
+
+        session.commit();
+        for (int i = 0; i < count; i++) {
+            assertNotNull("we got all the message: " + count, consumer.receive(20000));
+        }
+        session.commit();
+        connection.close();
+    }
+
     // https://issues.apache.org/activemq/browse/AMQ-2772
     public void testFailoverWithConnectionConsumer() throws Exception {
-        startCleanBroker();         
+        startCleanBroker();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
         Connection connection = cf.createConnection();
         connection.start();
-        
+
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
         Queue destination = session.createQueue(QUEUE_NAME);
 
@@ -560,6 +514,7 @@ public class FailoverTransactionTest {
                     public Session getSession() throws JMSException {
                         return poolSession;
                     }
+
                     public void start() throws JMSException {
                         connectionConsumerGotOne.countDown();
                         poolSession.run();
@@ -572,31 +527,30 @@ public class FailoverTransactionTest {
         MessageProducer producer;
         TextMessage message;
         final int count = 10;
-        for (int i=0; i<count; i++) {
-            producer = session.createProducer(destination);         
+        for (int i = 0; i < count; i++) {
+            producer = session.createProducer(destination);
             message = session.createTextMessage("Test message: " + count);
             producer.send(message);
             producer.close();
         }
-        
+
         // restart to force failover and connection state recovery before the commit
         broker.stop();
         startBroker(false);
-        
+
         session.commit();
-        for (int i=0; i<count-1; i++) {
+        for (int i = 0; i < count - 1; i++) {
             assertNotNull("we got all the message: " + count, consumer.receive(20000));
         }
         session.commit();
         connection.close();
-        
+
         assertTrue("connectionconsumer got a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
     }
-	
-    @Test
+
     public void testFailoverConsumerAckLost() throws Exception {
         // as failure depends on hash order of state tracker recovery, do a few times
-        for (int i=0; i<3; i++) {
+        for (int i = 0; i < 3; i++) {
             try {
                 doTestFailoverConsumerAckLost(i);
             } finally {
@@ -604,13 +558,12 @@ public class FailoverTransactionTest {
             }
         }
     }
-    
+
     public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
-        final int adapter = 0;
         broker = createBroker(true);
-        setPersistenceAdapter(adapter);
-            
-        broker.setPlugins(new BrokerPlugin[] {
+        setDefaultPersistenceAdapter(broker);
+
+        broker.setPlugins(new BrokerPlugin[]{
                 new BrokerPluginSupport() {
 
                     // broker is killed on delivered ack as prefetch is 1
@@ -618,11 +571,11 @@ public class FailoverTransactionTest {
                     public void acknowledge(
                             ConsumerBrokerExchange consumerExchange,
                             final MessageAck ack) throws Exception {
-                        
+
                         consumerExchange.getConnectionContext().setDontSendReponse(true);
-                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {
                             public void run() {
-                                LOG.info("Stopping broker on ack: "  + ack);
+                                LOG.info("Stopping broker on ack: " + ack);
                                 try {
                                     broker.stop();
                                 } catch (Exception e) {
@@ -634,7 +587,7 @@ public class FailoverTransactionTest {
                 }
         });
         broker.start();
-        
+
         Vector<Connection> connections = new Vector<Connection>();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
         Connection connection = cf.createConnection();
@@ -642,41 +595,41 @@ public class FailoverTransactionTest {
         connections.add(connection);
         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-           
+
         connection = cf.createConnection();
         connection.start();
         connections.add(connection);
         final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        
+
         connection = cf.createConnection();
         connection.start();
         connections.add(connection);
         final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        
+
         final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
         final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
-        
+
         produceMessage(producerSession, destination);
         produceMessage(producerSession, destination);
-        
+
         final Vector<Message> receivedMessages = new Vector<Message>();
         final CountDownLatch commitDoneLatch = new CountDownLatch(1);
         final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
-        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit after consume...");
                 try {
                     Message msg = consumer1.receive(20000);
                     LOG.info("consumer1 first attempt got message: " + msg);
                     receivedMessages.add(msg);
-                    
+
                     // give some variance to the runs
                     TimeUnit.SECONDS.sleep(pauseSeconds * 2);
-                    
+
                     // should not get a second message as there are two messages and two consumers
                     // and prefetch=1, but with failover and unordered connection restore it can get the second
                     // message.
-                    
+
                     // For the transaction to complete it needs to get the same one or two messages
                     // again so that the acks line up.
                     // If redelivery order is different, the commit should fail with an ex
@@ -686,7 +639,7 @@ public class FailoverTransactionTest {
                     if (msg != null) {
                         receivedMessages.add(msg);
                     }
-                    
+
                     LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
                     try {
                         consumerSession1.commit();
@@ -698,7 +651,7 @@ public class FailoverTransactionTest {
                         } else {
                             throw expectedSometimes;
                         }
-                        
+
                     }
                     commitDoneLatch.countDown();
                     LOG.info("done async commit");
@@ -707,18 +660,18 @@ public class FailoverTransactionTest {
                 }
             }
         });
-        
-               
+
+
         // will be stopped by the plugin
         broker.waitUntilStopped();
         broker = createBroker(false);
-        setPersistenceAdapter(adapter);
+        setDefaultPersistenceAdapter(broker);
         broker.start();
 
         assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
-        
+
         LOG.info("received message count: " + receivedMessages.size());
-        
+
         // new transaction
         Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
         LOG.info("post: from consumer1 received: " + msg);
@@ -728,7 +681,7 @@ public class FailoverTransactionTest {
             assertNull("should be nothing left for consumer as recieve should have committed", msg);
         }
         consumerSession1.commit();
-        
+
         if (gotTransactionRolledBackException.get() ||
                 !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
             // just one message successfully consumed or none consumed
@@ -738,20 +691,20 @@ public class FailoverTransactionTest {
             assertNotNull("got second message on consumer2", msg);
             consumerSession2.commit();
         }
-        
-        for (Connection c: connections) {
+
+        for (Connection c : connections) {
             c.close();
         }
-        
+
         // ensure no dangling messages with fresh broker etc
         broker.stop();
         broker.waitUntilStopped();
-        
+
         LOG.info("Checking for remaining/hung messages..");
         broker = createBroker(false);
-        setPersistenceAdapter(adapter);
+        setDefaultPersistenceAdapter(broker);
         broker.start();
-        
+
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
         connection = cf.createConnection();
@@ -767,7 +720,6 @@ public class FailoverTransactionTest {
         connection.close();
     }
 
-    @Test
     public void testAutoRollbackWithMissingRedeliveries() throws Exception {
         broker = createBroker(true);
         broker.start();
@@ -778,34 +730,33 @@ public class FailoverTransactionTest {
         final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
         final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageConsumer consumer = consumerSession.createConsumer(destination);
-        
+
         produceMessage(producerSession, destination);
-        
+
         Message msg = consumer.receive(20000);
         assertNotNull(msg);
-        
+
         broker.stop();
         broker = createBroker(false);
         // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
-        setPersistenceAdapter(1);
+        setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
         broker.start();
-        
+
         try {
             consumerSession.commit();
             fail("expected transaciton rolledback ex");
         } catch (TransactionRolledBackException expected) {
         }
-        
-        broker.stop(); 
+
+        broker.stop();
         broker = createBroker(false);
         broker.start();
-        
+
         assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
         connection.close();
     }
 
- 
-    @Test
+
     public void testWaitForMissingRedeliveries() throws Exception {
         LOG.info("testWaitForMissingRedeliveries()");
         broker = createBroker(true);
@@ -817,23 +768,23 @@ public class FailoverTransactionTest {
         final Queue destination = producerSession.createQueue(QUEUE_NAME);
         final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageConsumer consumer = consumerSession.createConsumer(destination);
-        
+
         produceMessage(producerSession, destination);
         Message msg = consumer.receive(20000);
         if (msg == null) {
             AutoFailTestSupport.dumpAllThreads("missing-");
         }
         assertNotNull("got message just produced", msg);
-        
+
         broker.stop();
         broker = createBroker(false);
         // use empty jdbc store so that wait for re-deliveries occur when failover resumes
-        setPersistenceAdapter(1);
+        setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
         broker.start();
 
         final CountDownLatch commitDone = new CountDownLatch(1);
         // will block pending re-deliveries
-        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit...");
                 try {
@@ -843,19 +794,18 @@ public class FailoverTransactionTest {
                 }
             }
         });
-        
-        broker.stop(); 
+
+        broker.stop();
         broker = createBroker(false);
         broker.start();
-        
-        assertTrue("commit was successfull", commitDone.await(30, TimeUnit.SECONDS));
-        
+
+        assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
+
         assertNull("should not get committed message", consumer.receive(5000));
         connection.close();
     }
 
-    
-    @Test
+
     public void testPoisonOnDeliveryWhilePending() throws Exception {
         LOG.info("testWaitForMissingRedeliveries()");
         broker = createBroker(true);
@@ -867,25 +817,25 @@ public class FailoverTransactionTest {
         final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=0");
         final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageConsumer consumer = consumerSession.createConsumer(destination);
-        
+
         produceMessage(producerSession, destination);
         Message msg = consumer.receive(20000);
         if (msg == null) {
             AutoFailTestSupport.dumpAllThreads("missing-");
         }
         assertNotNull("got message just produced", msg);
-        
-        broker.stop(); 
+
+        broker.stop();
         broker = createBroker(false);
         broker.start();
 
         final CountDownLatch commitDone = new CountDownLatch(1);
-        
+
 
         // with prefetch=0, it will not get redelivered as there will not be another receive
         // for this consumer. so it will block till it timeout with an exception
         // will block pending re-deliveries
-        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit...");
                 try {
@@ -895,30 +845,30 @@ public class FailoverTransactionTest {
                 }
             }
         });
-        
+
         // pull the pending message to this consumer where it will be poison as it is a duplicate without a tx
         MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
         assertNull("consumer2 not get a message while pending to 1", consumer2.receive(2000));
-        
+
         assertTrue("commit completed with ex", commitDone.await(15, TimeUnit.SECONDS));
         assertNull("consumer should not get rolledback and non redelivered message", consumer.receive(5000));
-        
+
         // message should be in dlq
         MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
         TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
         assertNotNull("found message in dlq", dlqMessage);
         assertEquals("text matches", "Test message", dlqMessage.getText());
         consumerSession.commit();
-        
+
         connection.close();
     }
 
     private void produceMessage(final Session producerSession, Queue destination)
             throws JMSException {
-        MessageProducer producer = producerSession.createProducer(destination);      
+        MessageProducer producer = producerSession.createProducer(destination);
         TextMessage message = producerSession.createTextMessage("Test message");
         producer.send(message);
         producer.close();
     }
-	
+
 }



Mime
View raw message