activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [18/47] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Mon, 08 Feb 2016 17:11:05 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index eb5bc61..c129791 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -16,29 +16,23 @@
  */
 package org.apache.activemq.transport.failover;
 
-import junit.framework.Test;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.ConsumerBrokerExchange;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.SocketProxy;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,28 +63,30 @@ import java.util.concurrent.atomic.AtomicBoolean;
 // see https://issues.apache.org/activemq/browse/AMQ-2473
 
 // https://issues.apache.org/activemq/browse/AMQ-2590
-public class FailoverTransactionTest extends TestSupport {
+@RunWith(BMUnitRunner.class)
+public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
    private static final String QUEUE_NAME = "Failover.WithTx";
-   private static final String TRANSPORT_URI = "tcp://localhost:0";
-   private String url;
-   BrokerService broker;
+   private String url = newURI(0);
 
-   public static Test suite() {
-      return suite(FailoverTransactionTest.class);
-   }
+   private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+   private static CountDownLatch brokerStopLatch;
+
+   private static SocketProxy proxy;
+   private static boolean firstSend;
+   private static int count;
+
+   private static EmbeddedJMS broker;
 
-   @Override
+   @Before
    public void setUp() throws Exception {
-      super.setMaxTestTime(2 * 60 * 1000); // some boxes can be real slow
-      super.setAutoFail(true);
-      super.setUp();
+      doByteman.set(false);
+      brokerStopLatch = new CountDownLatch(1);
    }
 
-   @Override
+   @After
    public void tearDown() throws Exception {
-      super.tearDown();
       stopBroker();
    }
 
@@ -101,39 +97,19 @@ public class FailoverTransactionTest extends TestSupport {
    }
 
    private void startCleanBroker() throws Exception {
-      startBroker(true);
-   }
-
-   public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      broker = createBroker(deleteAllMessagesOnStartup);
-      broker.start();
+      startBroker();
    }
 
-   public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
+   public void startBroker() throws Exception {
+      broker = createBroker();
       broker.start();
    }
 
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
-   }
-
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      broker = new BrokerService();
-      broker.setUseJmx(false);
-      broker.setAdvisorySupport(false);
-      broker.addConnector(bindAddress);
-      broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
-      url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
-      return broker;
-   }
-
    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
       // nothing to do
    }
 
+   @Test
    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -148,55 +124,31 @@ public class FailoverTransactionTest extends TestSupport {
 
       // restart to force failover and connection state recovery before the commit
       broker.stop();
-      startBroker(false, url);
+      startBroker();
 
       session.commit();
-      assertNotNull("we got the message", consumer.receive(20000));
+      Assert.assertNotNull("we got the message", consumer.receive(20000));
       session.commit();
       connection.close();
    }
 
-   public void initCombosForTestFailoverCommitReplyLost() {
-      String osName = System.getProperty("os.name");
-      Object[] persistenceAdapters;
-      if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
-         persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC};
-      }
-      else {
-         persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC};
-      }
-      addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
-   }
-
-   @SuppressWarnings("unchecked")
+   @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processCommitTransactionOnePhase",
+                           targetLocation = "EXIT",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)")
+           }
+   )
    public void testFailoverCommitReplyLost() throws Exception {
 
-      broker = createBroker(true);
-      setDefaultPersistenceAdapter(broker);
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         @Override
-         public void commitTransaction(ConnectionContext context,
-                                       TransactionId xid,
-                                       boolean onePhase) throws Exception {
-            super.commitTransaction(context, xid, onePhase);
-            // so commit will hang as if reply is lost
-            context.setDontSendReponse(true);
-            Executors.newSingleThreadExecutor().execute(new Runnable() {
-               @Override
-               public void run() {
-                  LOG.info("Stopping broker post commit...");
-                  try {
-                     broker.stop();
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-         }
-      }});
-      broker.start();
+      broker = createBroker();
+      startBrokerWithDurableQueue();
+      doByteman.set(true);
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       configureConnectionFactory(cf);
@@ -211,14 +163,13 @@ public class FailoverTransactionTest extends TestSupport {
       final CountDownLatch commitDoneLatch = new CountDownLatch(1);
       // broker will die on commit reply so this will hang till restart
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("doing async commit...");
             try {
                session.commit();
             }
             catch (JMSException e) {
-               assertTrue(e instanceof TransactionRolledBackException);
+               Assert.assertTrue(e instanceof TransactionRolledBackException);
                LOG.info("got commit exception: ", e);
             }
             commitDoneLatch.countDown();
@@ -227,29 +178,27 @@ public class FailoverTransactionTest extends TestSupport {
       });
 
       // will be stopped by the plugin
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      brokerStopLatch.await();
+      doByteman.set(false);
+      broker = createBroker();
       broker.start();
 
-      assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+      Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
 
       // new transaction
       Message msg = consumer.receive(20000);
       LOG.info("Received: " + msg);
-      assertNotNull("we got the message", msg);
-      assertNull("we got just one message", consumer.receive(2000));
+      Assert.assertNotNull("we got the message", msg);
+      Assert.assertNull("we got just one message", consumer.receive(2000));
       session.commit();
       consumer.close();
       connection.close();
 
       // ensure no dangling messages with fresh broker etc
       broker.stop();
-      broker.waitUntilStopped();
 
       LOG.info("Checking for remaining/hung messages..");
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      broker = createBroker();
       broker.start();
 
       // after restart, ensure no dangling messages
@@ -264,152 +213,38 @@ public class FailoverTransactionTest extends TestSupport {
          msg = consumer.receive(5000);
       }
       LOG.info("Received: " + msg);
-      assertNull("no messges left dangling but got: " + msg, msg);
+      Assert.assertNull("no messges left dangling but got: " + msg, msg);
       connection.close();
    }
 
    @SuppressWarnings("unchecked")
+   @Test
    public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception {
-
-      broker = createBroker(true);
-      setDefaultPersistenceAdapter(broker);
-
-      broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker(), new BrokerPluginSupport() {
-         @Override
-         public void commitTransaction(ConnectionContext context,
-                                       TransactionId xid,
-                                       boolean onePhase) throws Exception {
-            super.commitTransaction(context, xid, onePhase);
-            // so commit will hang as if reply is lost
-            context.setDontSendReponse(true);
-            Executors.newSingleThreadExecutor().execute(new Runnable() {
-               @Override
-               public void run() {
-                  LOG.info("Stopping broker post commit...");
-                  try {
-                     broker.stop();
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-         }
-      }});
-      broker.start();
-
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-      configureConnectionFactory(cf);
-      Connection connection = cf.createConnection();
-      connection.start();
-      final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-      Queue destination = session.createQueue(QUEUE_NAME.replace('.', '/') + "?consumer.prefetchSize=0");
-
-      MessageConsumer consumer = session.createConsumer(destination);
-      produceMessage(session, destination);
-
-      final CountDownLatch commitDoneLatch = new CountDownLatch(1);
-      // broker will die on commit reply so this will hang till restart
-      Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
-         public void run() {
-            LOG.info("doing async commit...");
-            try {
-               session.commit();
-            }
-            catch (JMSException e) {
-               assertTrue(e instanceof TransactionRolledBackException);
-               LOG.info("got commit exception: ", e);
-            }
-            commitDoneLatch.countDown();
-            LOG.info("done async commit");
-         }
-      });
-
-      // will be stopped by the plugin
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
-      broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
-      broker.start();
-
-      assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
-
-      // new transaction
-      Message msg = consumer.receive(20000);
-      LOG.info("Received: " + msg);
-      assertNotNull("we got the message", msg);
-      assertNull("we got just one message", consumer.receive(2000));
-      session.commit();
-      consumer.close();
-      connection.close();
-
-      // ensure no dangling messages with fresh broker etc
-      broker.stop();
-      broker.waitUntilStopped();
-
-      LOG.info("Checking for remaining/hung messages..");
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
-      broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
-      broker.start();
-
-      // after restart, ensure no dangling messages
-      cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-      configureConnectionFactory(cf);
-      connection = cf.createConnection();
-      connection.start();
-      Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      consumer = session2.createConsumer(destination);
-      msg = consumer.receive(1000);
-      if (msg == null) {
-         msg = consumer.receive(5000);
-      }
-      LOG.info("Received: " + msg);
-      assertNull("no messges left dangling but got: " + msg, msg);
-      connection.close();
-
-      ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations();
-      for (ActiveMQDestination dest : destinations) {
-         LOG.info("Destinations list: " + dest);
-      }
-      assertEquals("Only one destination", 1, broker.getRegionBroker().getDestinations().length);
-   }
-
-   public void initCombosForTestFailoverSendReplyLost() {
-      addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC
-                              // not implemented for AMQ store or PersistenceAdapterChoice.LevelDB
-                           });
+      //the original test validates destinations using forward slash (/) as
+      //separators instead of dot (.). The broker internally uses a plugin
+      //called DestinationPathSeparatorBroker to convert every occurrence of
+      // "/" into "." inside the server.
+      //Artemis doesn't support "/" so far and this test doesn't make sense therefore.
    }
 
    @SuppressWarnings("unchecked")
+   @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processMessage",
+                           targetLocation = "EXIT",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)")
+           }
+   )
    public void testFailoverSendReplyLost() throws Exception {
 
-      broker = createBroker(true);
-      setDefaultPersistenceAdapter(broker);
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         @Override
-         public void send(ProducerBrokerExchange producerExchange,
-                          org.apache.activemq.command.Message messageSend) throws Exception {
-            // so send will hang as if reply is lost
-            super.send(producerExchange, messageSend);
-            producerExchange.getConnectionContext().setDontSendReponse(true);
-            Executors.newSingleThreadExecutor().execute(new Runnable() {
-               @Override
-               public void run() {
-                  LOG.info("Stopping broker post send...");
-                  try {
-                     broker.stop();
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-         }
-      }});
-      broker.start();
+      broker = createBroker();
+      startBrokerWithDurableQueue();
+      doByteman.set(true);
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
       configureConnectionFactory(cf);
@@ -422,7 +257,6 @@ public class FailoverTransactionTest extends TestSupport {
       final CountDownLatch sendDoneLatch = new CountDownLatch(1);
       // broker will die on send reply so this will hang till restart
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("doing async send...");
             try {
@@ -431,7 +265,7 @@ public class FailoverTransactionTest extends TestSupport {
             catch (JMSException e) {
                //assertTrue(e instanceof TransactionRolledBackException);
                LOG.error("got send exception: ", e);
-               fail("got unexpected send exception" + e);
+               Assert.fail("got unexpected send exception" + e);
             }
             sendDoneLatch.countDown();
             LOG.info("done async send");
@@ -439,33 +273,27 @@ public class FailoverTransactionTest extends TestSupport {
       });
 
       // will be stopped by the plugin
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      brokerStopLatch.await();
+      doByteman.set(false);
+      broker = createBroker();
       LOG.info("restarting....");
       broker.start();
 
-      assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+      Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
 
       // new transaction
       Message msg = consumer.receive(20000);
       LOG.info("Received: " + msg);
-      assertNotNull("we got the message", msg);
-      assertNull("we got just one message", consumer.receive(2000));
+      Assert.assertNotNull("we got the message", msg);
+      Assert.assertNull("we got just one message", consumer.receive(2000));
       consumer.close();
       connection.close();
 
-      // verify stats
-      assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-      assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
-
       // ensure no dangling messages with fresh broker etc
       broker.stop();
-      broker.waitUntilStopped();
 
       LOG.info("Checking for remaining/hung messages with second restart..");
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      broker = createBroker();
       broker.start();
 
       // after restart, ensure no dangling messages
@@ -480,64 +308,33 @@ public class FailoverTransactionTest extends TestSupport {
          msg = consumer.receive(5000);
       }
       LOG.info("Received: " + msg);
-      assertNull("no messges left dangling but got: " + msg, msg);
+      Assert.assertNull("no messges left dangling but got: " + msg, msg);
       connection.close();
    }
 
-   public void initCombosForTestFailoverConnectionSendReplyLost() {
-      addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC
-                              // last producer message id store feature not implemented for AMQ store
-                              // or PersistenceAdapterChoice.LevelDB
-                           });
-   }
-
    @SuppressWarnings("unchecked")
+   @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processMessage",
+                           targetLocation = "EXIT",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopProxyOnFirstSend(context)")
+           }
+   )
    public void testFailoverConnectionSendReplyLost() throws Exception {
 
-      broker = createBroker(true);
-      PersistenceAdapter store = setDefaultPersistenceAdapter(broker);
-      if (store instanceof KahaDBPersistenceAdapter) {
-         // duplicate checker not updated on canceled tasks, even it
-         // it was, recovery of the audit would fail as the message is
-         // not recorded in the store and the audit may not be up to date.
-         // So if duplicate messages are an absolute no no after restarts,
-         // ConcurrentStoreAndDispatchQueues must be disabled
-         ((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false);
-      }
-
-      final SocketProxy proxy = new SocketProxy();
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         private boolean firstSend = true;
-
-         @Override
-         public void send(ProducerBrokerExchange producerExchange,
-                          org.apache.activemq.command.Message messageSend) throws Exception {
-            // so send will hang as if reply is lost
-            super.send(producerExchange, messageSend);
-            if (firstSend) {
-               firstSend = false;
-
-               producerExchange.getConnectionContext().setDontSendReponse(true);
-               Executors.newSingleThreadExecutor().execute(new Runnable() {
-                  @Override
-                  public void run() {
-                     LOG.info("Stopping connection post send...");
-                     try {
-                        proxy.close();
-                     }
-                     catch (Exception e) {
-                        e.printStackTrace();
-                     }
-                  }
-               });
-            }
-         }
-      }});
-      broker.start();
+      broker = createBroker();
+      proxy = new SocketProxy();
+      firstSend = true;
+      startBrokerWithDurableQueue();
 
       proxy.setTarget(new URI(url));
       proxy.open();
+      doByteman.set(true);
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
       configureConnectionFactory(cf);
@@ -550,7 +347,6 @@ public class FailoverTransactionTest extends TestSupport {
       final CountDownLatch sendDoneLatch = new CountDownLatch(1);
       // proxy connection will die on send reply so this will hang on failover reconnect till open
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("doing async send...");
             try {
@@ -566,29 +362,24 @@ public class FailoverTransactionTest extends TestSupport {
       });
 
       // will be closed by the plugin
-      assertTrue("proxy was closed", proxy.waitUntilClosed(30));
+      Assert.assertTrue("proxy was closed", proxy.waitUntilClosed(30));
       LOG.info("restarting proxy");
       proxy.open();
 
-      assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+      Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
 
       Message msg = consumer.receive(20000);
       LOG.info("Received: " + msg);
-      assertNotNull("we got the message", msg);
-      assertNull("we got just one message", consumer.receive(2000));
+      Assert.assertNotNull("we got the message", msg);
+      Assert.assertNull("we got just one message", consumer.receive(2000));
       consumer.close();
       connection.close();
 
-      // verify stats, connection dup suppression means dups don't get to broker
-      assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-
       // ensure no dangling messages with fresh broker etc
       broker.stop();
-      broker.waitUntilStopped();
 
       LOG.info("Checking for remaining/hung messages with restart..");
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      broker = createBroker();
       broker.start();
 
       // after restart, ensure no dangling messages
@@ -603,10 +394,11 @@ public class FailoverTransactionTest extends TestSupport {
          msg = consumer.receive(5000);
       }
       LOG.info("Received: " + msg);
-      assertNull("no messges left dangling but got: " + msg, msg);
+      Assert.assertNull("no messges left dangling but got: " + msg, msg);
       connection.close();
    }
 
+   @Test
    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
@@ -621,16 +413,17 @@ public class FailoverTransactionTest extends TestSupport {
 
       // restart to force failover and connection state recovery before the commit
       broker.stop();
-      startBroker(false, url);
+      startBroker();
 
       session.commit();
 
       // without tracking producers, message will not be replayed on recovery
-      assertNull("we got the message", consumer.receive(5000));
+      Assert.assertNull("we got the message", consumer.receive(5000));
       session.commit();
       connection.close();
    }
 
+   @Test
    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -653,17 +446,18 @@ public class FailoverTransactionTest extends TestSupport {
 
       // restart to force failover and connection state recovery before the commit
       broker.stop();
-      startBroker(false, url);
+      startBroker();
 
       session.commit();
       for (int i = 0; i < count; i++) {
-         assertNotNull("we got all the message: " + count, consumer.receive(20000));
+         Assert.assertNotNull("we got all the message: " + count, consumer.receive(20000));
       }
       session.commit();
       connection.close();
    }
 
    // https://issues.apache.org/activemq/browse/AMQ-2772
+   @Test
    public void testFailoverWithConnectionConsumer() throws Exception {
       startCleanBroker();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -677,15 +471,12 @@ public class FailoverTransactionTest extends TestSupport {
       final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
       final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
-         @Override
          public ServerSession getServerSession() throws JMSException {
             return new ServerSession() {
-               @Override
                public Session getSession() throws JMSException {
                   return poolSession;
                }
 
-               @Override
                public void start() throws JMSException {
                   connectionConsumerGotOne.countDown();
                   poolSession.run();
@@ -707,18 +498,30 @@ public class FailoverTransactionTest extends TestSupport {
 
       // restart to force failover and connection state recovery before the commit
       broker.stop();
-      startBroker(false, url);
+      startBroker();
 
       session.commit();
       for (int i = 0; i < count - 1; i++) {
-         assertNotNull("Failed to get message: " + count, consumer.receive(20000));
+         Assert.assertNotNull("Failed to get message: " + count, consumer.receive(20000));
       }
       session.commit();
       connection.close();
 
-      assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
+      Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
    }
 
+   @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processMessageAck",
+                           targetLocation = "ENTRY",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)")
+           }
+   )
    public void testFailoverConsumerAckLost() throws Exception {
       // as failure depends on hash order of state tracker recovery, do a few times
       for (int i = 0; i < 3; i++) {
@@ -734,31 +537,10 @@ public class FailoverTransactionTest extends TestSupport {
 
    @SuppressWarnings("unchecked")
    public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
-      broker = createBroker(true);
-      setDefaultPersistenceAdapter(broker);
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-
-         // broker is killed on delivered ack as prefetch is 1
-         @Override
-         public void acknowledge(ConsumerBrokerExchange consumerExchange, final MessageAck ack) throws Exception {
-
-            consumerExchange.getConnectionContext().setDontSendReponse(true);
-            Executors.newSingleThreadExecutor().execute(new Runnable() {
-               @Override
-               public void run() {
-                  LOG.info("Stopping broker on ack: " + ack);
-                  try {
-                     broker.stop();
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-         }
-      }});
+      broker = createBroker();
       broker.start();
+      brokerStopLatch = new CountDownLatch(1);
+      doByteman.set(true);
 
       Vector<Connection> connections = new Vector<>();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -789,7 +571,6 @@ public class FailoverTransactionTest extends TestSupport {
       final CountDownLatch commitDoneLatch = new CountDownLatch(1);
       final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("doing async commit after consume...");
             try {
@@ -839,12 +620,12 @@ public class FailoverTransactionTest extends TestSupport {
       });
 
       // will be stopped by the plugin
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      brokerStopLatch.await();
+      broker = createBroker();
       broker.start();
+      doByteman.set(false);
 
-      assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+      Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
 
       LOG.info("received message count: " + receivedMessages.size());
 
@@ -852,10 +633,10 @@ public class FailoverTransactionTest extends TestSupport {
       Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
       LOG.info("post: from consumer1 received: " + msg);
       if (gotTransactionRolledBackException.get()) {
-         assertNotNull("should be available again after commit rollback ex", msg);
+         Assert.assertNotNull("should be available again after commit rollback ex", msg);
       }
       else {
-         assertNull("should be nothing left for consumer as receive should have committed", msg);
+         Assert.assertNull("should be nothing left for consumer as receive should have committed", msg);
       }
       consumerSession1.commit();
 
@@ -864,7 +645,7 @@ public class FailoverTransactionTest extends TestSupport {
          // consumer2 should get other message
          msg = consumer2.receive(10000);
          LOG.info("post: from consumer2 received: " + msg);
-         assertNotNull("got second message on consumer2", msg);
+         Assert.assertNotNull("got second message on consumer2", msg);
          consumerSession2.commit();
       }
 
@@ -874,11 +655,9 @@ public class FailoverTransactionTest extends TestSupport {
 
       // ensure no dangling messages with fresh broker etc
       broker.stop();
-      broker.waitUntilStopped();
 
       LOG.info("Checking for remaining/hung messages..");
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      broker = createBroker();
       broker.start();
 
       // after restart, ensure no dangling messages
@@ -893,36 +672,29 @@ public class FailoverTransactionTest extends TestSupport {
          msg = sweeper.receive(5000);
       }
       LOG.info("Sweep received: " + msg);
-      assertNull("no messges left dangling but got: " + msg, msg);
+      Assert.assertNull("no messges left dangling but got: " + msg, msg);
       connection.close();
+
+      broker.stop();
    }
 
+   @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processRemoveConsumer",
+                           targetLocation = "ENTRY",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverTransactionTest.stopBrokerOnCounter(context)")
+           }
+   )
    public void testPoolingNConsumesAfterReconnect() throws Exception {
-      broker = createBroker(true);
-      setDefaultPersistenceAdapter(broker);
+      broker = createBroker();
+      startBrokerWithDurableQueue();
 
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         int count = 0;
-
-         @Override
-         public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
-            if (count++ == 1) {
-               Executors.newSingleThreadExecutor().execute(new Runnable() {
-                  @Override
-                  public void run() {
-                     LOG.info("Stopping broker on removeConsumer: " + info);
-                     try {
-                        broker.stop();
-                     }
-                     catch (Exception e) {
-                        e.printStackTrace();
-                     }
-                  }
-               });
-            }
-         }
-      }});
-      broker.start();
+      doByteman.set(true);
 
       Vector<Connection> connections = new Vector<>();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -951,6 +723,7 @@ public class FailoverTransactionTest extends TestSupport {
       for (int i = 0; i < consumerCount; i++) {
          consumers.push(consumerSession.createConsumer(destination));
       }
+
       final ExecutorService executorService = Executors.newCachedThreadPool();
 
       final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
@@ -973,7 +746,6 @@ public class FailoverTransactionTest extends TestSupport {
             for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
 
                executorService.execute(new Runnable() {
-                  @Override
                   public void run() {
                      MessageConsumer localConsumer = null;
                      try {
@@ -1011,9 +783,9 @@ public class FailoverTransactionTest extends TestSupport {
       consumer.close();
 
       // will be stopped by the plugin
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      brokerStopLatch.await();
+      doByteman.set(false);
+      broker = createBroker();
       broker.start();
 
       consumer = consumerSession.createConsumer(destination);
@@ -1023,8 +795,9 @@ public class FailoverTransactionTest extends TestSupport {
       for (int i = 0; i < 4 && msg == null; i++) {
          msg = consumer.receive(1000);
       }
+
       LOG.info("post: from consumer1 received: " + msg);
-      assertNotNull("got message after failover", msg);
+      Assert.assertNotNull("got message after failover", msg);
       msg.acknowledge();
 
       for (Connection c : connections) {
@@ -1032,8 +805,15 @@ public class FailoverTransactionTest extends TestSupport {
       }
    }
 
+   private void startBrokerWithDurableQueue() throws Exception {
+      broker.start();
+      //auto created queue can't survive a restart, so we need this
+      broker.getJMSServerManager().createQueue(false, QUEUE_NAME, null, true, QUEUE_NAME);
+   }
+
+   @Test
    public void testAutoRollbackWithMissingRedeliveries() throws Exception {
-      broker = createBroker(true);
+      broker = createBroker();
       broker.start();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       configureConnectionFactory(cf);
@@ -1047,32 +827,32 @@ public class FailoverTransactionTest extends TestSupport {
       produceMessage(producerSession, destination);
 
       Message msg = consumer.receive(20000);
-      assertNotNull(msg);
+      Assert.assertNotNull(msg);
 
       broker.stop();
-      broker = createBroker(false, url);
+      broker = createBroker();
       // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
-      setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
       broker.start();
 
       try {
          consumerSession.commit();
-         fail("expected transaciton rolledback ex");
+         Assert.fail("expected transaciton rolledback ex");
       }
       catch (TransactionRolledBackException expected) {
       }
 
       broker.stop();
-      broker = createBroker(false, url);
+      broker = createBroker();
       broker.start();
 
-      assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
+      Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
       connection.close();
    }
 
+   @Test
    public void testWaitForMissingRedeliveries() throws Exception {
       LOG.info("testWaitForMissingRedeliveries()");
-      broker = createBroker(true);
+      broker = createBroker();
       broker.start();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
       configureConnectionFactory(cf);
@@ -1088,18 +868,15 @@ public class FailoverTransactionTest extends TestSupport {
       if (msg == null) {
          AutoFailTestSupport.dumpAllThreads("missing-");
       }
-      assertNotNull("got message just produced", msg);
+      Assert.assertNotNull("got message just produced", msg);
 
       broker.stop();
-      broker = createBroker(false, url);
-      // use empty jdbc store so that wait for re-deliveries occur when failover resumes
-      setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
+      broker = createBroker();
       broker.start();
 
       final CountDownLatch commitDone = new CountDownLatch(1);
       // will block pending re-deliveries
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("doing async commit...");
             try {
@@ -1112,18 +889,19 @@ public class FailoverTransactionTest extends TestSupport {
       });
 
       broker.stop();
-      broker = createBroker(false, url);
+      broker = createBroker();
       broker.start();
 
-      assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
+      Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
 
-      assertNull("should not get committed message", consumer.receive(5000));
+      Assert.assertNull("should not get committed message", consumer.receive(5000));
       connection.close();
    }
 
+   @Test
    public void testReDeliveryWhilePending() throws Exception {
       LOG.info("testReDeliveryWhilePending()");
-      broker = createBroker(true);
+      broker = createBroker();
       broker.start();
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
       configureConnectionFactory(cf);
@@ -1139,13 +917,13 @@ public class FailoverTransactionTest extends TestSupport {
       if (msg == null) {
          AutoFailTestSupport.dumpAllThreads("missing-");
       }
-      assertNotNull("got message just produced", msg);
+      Assert.assertNotNull("got message just produced", msg);
 
       // add another consumer into the mix that may get the message after restart
       MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
 
       broker.stop();
-      broker = createBroker(false, url);
+      broker = createBroker();
       broker.start();
 
       final CountDownLatch commitDone = new CountDownLatch(1);
@@ -1154,7 +932,6 @@ public class FailoverTransactionTest extends TestSupport {
 
       // commit may fail if other consumer gets the message on restart
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("doing async commit...");
             try {
@@ -1169,24 +946,24 @@ public class FailoverTransactionTest extends TestSupport {
          }
       });
 
-      assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
+      Assert.assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
 
       // either message redelivered in existing tx or consumed by consumer2
       // should not be available again in any event
-      assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
+      Assert.assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
 
       // consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
       if (exceptions.isEmpty()) {
          LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
-         assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
+         Assert.assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
       }
       else {
          LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
-         assertNotNull("consumer2 got message", consumer2.receive(2000));
+         Assert.assertNotNull("consumer2 got message", consumer2.receive(2000));
          consumerSession.commit();
          // no message should be in dlq
          MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
-         assertNull("nothing in the dlq", dlqConsumer.receive(5000));
+         Assert.assertNull("nothing in the dlq", dlqConsumer.receive(5000));
       }
       connection.close();
    }
@@ -1198,4 +975,63 @@ public class FailoverTransactionTest extends TestSupport {
       producer.close();
    }
 
+   public static void holdResponseAndStopBroker(final AMQConnectionContext context) {
+      if (doByteman.get()) {
+         context.setDontSendReponse(true);
+         Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+               LOG.info("Stopping broker post commit...");
+               try {
+                  broker.stop();
+               }
+               catch (Exception e) {
+                  e.printStackTrace();
+               }
+               finally {
+                  brokerStopLatch.countDown();
+               }
+            }
+         });
+      }
+   }
+
+   public static void holdResponseAndStopProxyOnFirstSend(final AMQConnectionContext context) {
+      if (doByteman.get()) {
+         if (firstSend) {
+            firstSend = false;
+            context.setDontSendReponse(true);
+            Executors.newSingleThreadExecutor().execute(new Runnable() {
+               public void run() {
+                  LOG.info("Stopping connection post send...");
+                  try {
+                     proxy.close();
+                  }
+                  catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            });
+         }
+      }
+   }
+
+   public static void stopBrokerOnCounter(final AMQConnectionContext context) {
+      if (doByteman.get()) {
+         if (count++ == 1) {
+            Executors.newSingleThreadExecutor().execute(new Runnable() {
+               public void run() {
+                  try {
+                     broker.stop();
+                  }
+                  catch (Exception e) {
+                     e.printStackTrace();
+                  }
+                  finally {
+                     brokerStopLatch.countDown();
+                  }
+               }
+            });
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
index 0ba3939..149af92 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
@@ -23,7 +23,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.net.URI;
 
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
@@ -34,7 +35,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FailoverTransportBackupsTest {
+public class FailoverTransportBackupsTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class);
 
@@ -43,23 +44,11 @@ public class FailoverTransportBackupsTest {
    private int transportInterruptions;
    private int transportResumptions;
 
-   BrokerService broker1;
-   BrokerService broker2;
-   BrokerService broker3;
+   EmbeddedJMS[] servers = new EmbeddedJMS[3];
 
    @Before
    public void setUp() throws Exception {
-      broker1 = createBroker("1");
-      broker2 = createBroker("2");
-      broker3 = createBroker("3");
-
-      broker1.start();
-      broker2.start();
-      broker3.start();
-
-      broker1.waitUntilStarted();
-      broker2.waitUntilStarted();
-      broker3.waitUntilStarted();
+      setUpClusterServers(servers);
 
       // Reset stats
       transportInterruptions = 0;
@@ -71,13 +60,7 @@ public class FailoverTransportBackupsTest {
       if (transport != null) {
          transport.stop();
       }
-
-      broker1.stop();
-      broker1.waitUntilStopped();
-      broker2.stop();
-      broker2.waitUntilStopped();
-      broker3.stop();
-      broker3.waitUntilStopped();
+      shutDownClusterServers(servers);
    }
 
    @Test
@@ -111,7 +94,7 @@ public class FailoverTransportBackupsTest {
          }
       }));
 
-      broker1.stop();
+      servers[0].stop();
 
       assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
          @Override
@@ -124,7 +107,7 @@ public class FailoverTransportBackupsTest {
       assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
       assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
 
-      broker2.stop();
+      servers[1].stop();
 
       assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
          @Override
@@ -153,7 +136,7 @@ public class FailoverTransportBackupsTest {
          }
       }));
 
-      broker1.stop();
+      servers[0].stop();
 
       assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
          @Override
@@ -163,7 +146,7 @@ public class FailoverTransportBackupsTest {
          }
       }));
 
-      broker2.stop();
+      servers[1].stop();
 
       assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
          @Override
@@ -174,20 +157,11 @@ public class FailoverTransportBackupsTest {
       }));
    }
 
-   private BrokerService createBroker(String name) throws Exception {
-      BrokerService bs = new BrokerService();
-      bs.setBrokerName(name);
-      bs.setUseJmx(false);
-      bs.setPersistent(false);
-      bs.addConnector("tcp://localhost:0");
-      return bs;
-   }
-
    protected Transport createTransport(int backups) throws Exception {
       String connectionUri = "failover://(" +
-         broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
-         broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," +
-         broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")";
+         newURI(0) + "," +
+         newURI(1) + "," +
+         newURI(2) + ")";
 
       if (backups > 0) {
          connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
index 806faca..15d28d3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
@@ -18,41 +18,205 @@ package org.apache.activemq.transport.failover;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
 
-import junit.framework.Test;
-
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.network.NetworkTestSupport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FailoverTransportBrokerTest extends NetworkTestSupport {
+@RunWith(Parameterized.class)
+public class FailoverTransportBrokerTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBrokerTest.class);
+   protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+   protected long idGenerator;
+   protected int msgIdGenerator;
+   protected int maxWait = 10000;
+   public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+   @Parameterized.Parameters
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+              {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")},
+              {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")},
+              {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")},
+              {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")}
+      });
+   }
+
+   private EmbeddedJMS server;
+   private EmbeddedJMS remoteServer;
 
    public ActiveMQDestination destination;
    public int deliveryMode;
 
-   public void initCombosForTestPublisherFailsOver() {
-      addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-      addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")});
+   public FailoverTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) {
+      this.deliveryMode = deliveryMode;
+      this.destination = destination;
+   }
+
+   @Before
+   public void setUp() throws Exception {
+      Configuration config0 = createConfig(0);
+      server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      Configuration config1 = createConfig(1);
+      remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server.start();
+      remoteServer.start();
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      for (StubConnection conn : connections) {
+         try {
+            conn.stop();
+         }
+         catch (Exception e) {
+         }
+      }
+      try {
+         remoteServer.stop();
+      }
+      catch (Exception e) {
+      }
+      try {
+         server.stop();
+      }
+      catch (Exception e) {
+      }
+   }
+
+   protected StubConnection createConnection() throws Exception {
+      Transport transport = TransportFactory.connect(new URI(newURI(0)));
+      StubConnection connection = new StubConnection(transport);
+      connections.add(connection);
+      return connection;
+   }
+
+   protected StubConnection createRemoteConnection() throws Exception {
+      Transport transport = TransportFactory.connect(new URI(newURI(1)));
+      StubConnection connection = new StubConnection(transport);
+      connections.add(connection);
+      return connection;
+   }
+
+   protected ConnectionInfo createConnectionInfo() throws Exception {
+      ConnectionInfo info = new ConnectionInfo();
+      info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+      info.setClientId(info.getConnectionId().getValue());
+      return info;
+   }
+
+   protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+      SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+      return info;
+   }
+
+   protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
+                                             ActiveMQDestination destination) throws Exception {
+      ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+      info.setBrowser(false);
+      info.setDestination(destination);
+      info.setPrefetchSize(1000);
+      info.setDispatchAsync(false);
+      return info;
+   }
+
+   protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+      ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+      return info;
    }
 
+   protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+      Message message = createMessage(producerInfo, destination);
+      message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+      return message;
+   }
+
+   protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+      ActiveMQTextMessage message = new ActiveMQTextMessage();
+      message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+      message.setDestination(destination);
+      message.setPersistent(false);
+      try {
+         message.setText("Test Message Payload.");
+      }
+      catch (MessageNotWriteableException e) {
+      }
+      return message;
+   }
+
+   public Message receiveMessage(StubConnection connection) throws InterruptedException {
+      return receiveMessage(connection, maxWait);
+   }
+
+   public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+      while (true) {
+         Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+         if (o == null) {
+            return null;
+         }
+         if (o instanceof MessageDispatch) {
+
+            MessageDispatch dispatch = (MessageDispatch) o;
+            if (dispatch.getMessage() == null) {
+               return null;
+            }
+            dispatch.setMessage(dispatch.getMessage().copy());
+            dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+            return dispatch.getMessage();
+         }
+      }
+   }
+
+   protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
+      long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+      while (true) {
+         Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
+         if (o == null) {
+            return;
+         }
+         if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) {
+            Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId());
+         }
+      }
+   }
+
+   @Test
    public void testPublisherFailsOver() throws Exception {
 
       // Start a normal consumer on the local broker
@@ -92,19 +256,22 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
       // See which broker we were connected to.
       StubConnection connectionA;
       StubConnection connectionB;
-      TransportConnector serverA;
-      if (connector.getServer().getConnectURI().equals(ft.getConnectedTransportURI())) {
+
+
+      EmbeddedJMS serverA;
+
+      if (new URI(newURI(0)).equals(ft.getConnectedTransportURI())) {
          connectionA = connection1;
          connectionB = connection2;
-         serverA = connector;
+         serverA = server;
       }
       else {
          connectionA = connection2;
          connectionB = connection1;
-         serverA = remoteConnector;
+         serverA = remoteServer;
       }
 
-      assertNotNull(receiveMessage(connectionA));
+      Assert.assertNotNull(receiveMessage(connectionA));
       assertNoMessagesLeft(connectionB);
 
       // Dispose the server so that it fails over to the other server.
@@ -113,7 +280,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
 
       connection3.request(createMessage(producerInfo3, destination, deliveryMode));
 
-      assertNotNull(receiveMessage(connectionB));
+      Assert.assertNotNull(receiveMessage(connectionB));
       assertNoMessagesLeft(connectionA);
 
    }
@@ -150,34 +317,16 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
       while (count++ < 20 && info[0] == null) {
          TimeUnit.SECONDS.sleep(1);
       }
-      assertNotNull("got a valid brokerInfo after 20 secs", info[0]);
-      assertNull("no peer brokers present", info[0].getPeerBrokerInfos());
-   }
-
-   @Override
-   protected String getLocalURI() {
-      return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
-   }
-
-   @Override
-   protected String getRemoteURI() {
-      return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+      Assert.assertNotNull("got a valid brokerInfo after 20 secs", info[0]);
+      Assert.assertNull("no peer brokers present", info[0].getPeerBrokerInfos());
    }
 
    protected StubConnection createFailoverConnection(TransportListener listener) throws Exception {
-      URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + "");
+      URI failoverURI = new URI("failover://" + newURI(0) + "," + newURI(1) + "");
       Transport transport = TransportFactory.connect(failoverURI);
       StubConnection connection = new StubConnection(transport, listener);
       connections.add(connection);
       return connection;
    }
 
-   public static Test suite() {
-      return suite(FailoverTransportBrokerTest.class);
-   }
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
index 8155575..d64cc58 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Collection;
 
-import org.apache.activemq.transport.failover.FailoverTransport;
 import org.junit.Test;
 
 public class FailoverTransportUriHandlingTest {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
index e792228..002a788 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
@@ -30,38 +30,46 @@ import javax.jms.Session;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class FailoverUpdateURIsTest extends TestCase {
+public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest {
 
    private static final String QUEUE_NAME = "test.failoverupdateuris";
    private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class);
 
-   String firstTcpUri = "tcp://localhost:61616";
-   String secondTcpUri = "tcp://localhost:61626";
+   String firstTcpUri = newURI(0);
+   String secondTcpUri = newURI(10);
    Connection connection = null;
-   BrokerService bs1 = null;
-   BrokerService bs2 = null;
+   EmbeddedJMS server0 = null;
+   EmbeddedJMS server1 = null;
 
-   @Override
+   @After
    public void tearDown() throws Exception {
       if (connection != null) {
          connection.close();
       }
-      if (bs1 != null) {
-         bs1.stop();
+      if (server0 != null) {
+         server0.stop();
       }
-      if (bs2 != null) {
-         bs2.stop();
+      if (server1 != null) {
+         server1.stop();
       }
    }
 
+   @Test
    public void testUpdateURIsViaFile() throws Exception {
 
-      String targetDir = "target/" + getName();
+      String targetDir = "target/testUpdateURIsViaFile";
       new File(targetDir).mkdir();
       File updateFile = new File(targetDir + "/updateURIsFile.txt");
       LOG.info(updateFile);
@@ -72,8 +80,9 @@ public class FailoverUpdateURIsTest extends TestCase {
       out.write(firstTcpUri.getBytes());
       out.close();
 
-      bs1 = createBroker("bs1", firstTcpUri);
-      bs1.start();
+      Configuration config0 = createConfig(0);
+      server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      server0.start();
 
       // no failover uri's to start with, must be read from file...
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
@@ -86,14 +95,14 @@ public class FailoverUpdateURIsTest extends TestCase {
       Message message = session.createTextMessage("Test message");
       producer.send(message);
       Message msg = consumer.receive(2000);
-      assertNotNull(msg);
+      Assert.assertNotNull(msg);
 
-      bs1.stop();
-      bs1.waitUntilStopped();
-      bs1 = null;
+      server0.stop();
+      server0 = null;
 
-      bs2 = createBroker("bs2", secondTcpUri);
-      bs2.start();
+      Configuration config1 = createConfig(10);
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server1.start();
 
       // add the transport uri for broker number 2
       out = new FileOutputStream(updateFile, true);
@@ -103,25 +112,16 @@ public class FailoverUpdateURIsTest extends TestCase {
 
       producer.send(message);
       msg = consumer.receive(2000);
-      assertNotNull(msg);
-   }
-
-   private BrokerService createBroker(String name, String tcpUri) throws Exception {
-      BrokerService bs = new BrokerService();
-      bs.setBrokerName(name);
-      bs.setUseJmx(false);
-      bs.setPersistent(false);
-      bs.addConnector(tcpUri);
-      return bs;
+      Assert.assertNotNull(msg);
    }
 
+   @Test
    public void testAutoUpdateURIs() throws Exception {
-
-      bs1 = new BrokerService();
-      bs1.setUseJmx(false);
-      TransportConnector transportConnector = bs1.addConnector(firstTcpUri);
-      transportConnector.setUpdateClusterClients(true);
-      bs1.start();
+      Configuration config0 = createConfig(0);
+      deployClusterConfiguration(config0, 10);
+      server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      server0.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")");
       connection = cf.createConnection();
@@ -133,24 +133,23 @@ public class FailoverUpdateURIsTest extends TestCase {
       Message message = session.createTextMessage("Test message");
       producer.send(message);
       Message msg = consumer.receive(4000);
-      assertNotNull(msg);
+      Assert.assertNotNull(msg);
 
-      bs2 = createBroker("bs2", secondTcpUri);
-      NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")");
-      networkConnector.setDuplex(true);
-      bs2.start();
-      LOG.info("started brokerService 2");
-      bs2.waitUntilStarted();
+      Configuration config1 = createConfig(10);
+      deployClusterConfiguration(config1, 0);
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server1.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
 
       TimeUnit.SECONDS.sleep(4);
 
       LOG.info("stopping brokerService 1");
-      bs1.stop();
-      bs1.waitUntilStopped();
-      bs1 = null;
+      server0.stop();
+      server0 = null;
 
       producer.send(message);
       msg = consumer.receive(4000);
-      assertNotNull(msg);
+      Assert.assertNotNull(msg);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
index ae637ef..a028832 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
@@ -43,4 +43,5 @@ public class FailoverUriTest extends TransportUriTest {
    public static Test suite() {
       return suite(FailoverUriTest.class);
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
index 34e7333..dad241c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.failover;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -26,9 +27,13 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.transport.TransportListener;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -36,19 +41,20 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
 
-public class InitalReconnectDelayTest {
+public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
 
    private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
-   protected BrokerService broker1;
-   protected BrokerService broker2;
+   protected EmbeddedJMS server1;
+   protected EmbeddedJMS server2;
+
+//   protected BrokerService broker1;
+//   protected BrokerService broker2;
 
    @Test
    public void testInitialReconnectDelay() throws Exception {
 
-      String uriString = "failover://(tcp://localhost:" +
-         broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
-         ",tcp://localhost:" +
-         broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+      String uriString = "failover://(" + newURI(1) +
+         "," + newURI(2) +
          ")?randomize=false&initialReconnectDelay=15000";
 
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -67,7 +73,7 @@ public class InitalReconnectDelayTest {
       //Halt the broker1...
       LOG.info("Stopping the Broker1...");
       start = (new Date()).getTime();
-      broker1.stop();
+      server1.stop();
 
       LOG.info("Attempting to send... failover should kick in...");
       producer.send(session.createTextMessage("TEST"));
@@ -81,10 +87,8 @@ public class InitalReconnectDelayTest {
    @Test
    public void testNoSuspendedCallbackOnNoReconnect() throws Exception {
 
-      String uriString = "failover://(tcp://localhost:" +
-         broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
-         ",tcp://localhost:" +
-         broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+      String uriString = "failover://(" + newURI(1) +
+         "," + newURI(2) +
          ")?randomize=false&maxReconnectAttempts=0";
 
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -124,7 +128,7 @@ public class InitalReconnectDelayTest {
       calls.set(0);
 
       LOG.info("Stopping the Broker1...");
-      broker1.stop();
+      server1.stop();
 
       LOG.info("Attempting to send... failover should throw on disconnect");
       try {
@@ -140,25 +144,19 @@ public class InitalReconnectDelayTest {
    @Before
    public void setUp() throws Exception {
 
-      final String dataDir = "target/data/shared";
+      Configuration config1 = createConfig(1);
+      Configuration config2 = createConfig(2);
 
-      broker1 = new BrokerService();
+      deployClusterConfiguration(config1, 2);
+      deployClusterConfiguration(config2, 1);
 
-      broker1.setBrokerName("broker1");
-      broker1.setDeleteAllMessagesOnStartup(true);
-      broker1.setDataDirectory(dataDir);
-      broker1.addConnector("tcp://localhost:0");
-      broker1.setUseJmx(false);
-      broker1.start();
-      broker1.waitUntilStarted();
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
 
-      broker2 = new BrokerService();
-      broker2.setBrokerName("broker2");
-      broker2.setDataDirectory(dataDir);
-      broker2.setUseJmx(false);
-      broker2.addConnector("tcp://localhost:0");
-      broker2.start();
-      broker2.waitUntilStarted();
+      server1.start();
+      server2.start();
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
 
    }
 
@@ -172,16 +170,8 @@ public class InitalReconnectDelayTest {
 
    @After
    public void tearDown() throws Exception {
-
-      if (broker1.isStarted()) {
-         broker1.stop();
-         broker1.waitUntilStopped();
-      }
-
-      if (broker2.isStarted()) {
-         broker2.stop();
-         broker2.waitUntilStopped();
-      }
+      server1.stop();
+      server2.stop();
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
index 4ba5516..83d43af 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
@@ -28,29 +28,33 @@ import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
-import junit.framework.TestCase;
+import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.mock.MockTransport;
-import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ReconnectTest extends TestCase {
+public class ReconnectTest extends OpenwireArtemisBaseTest {
 
    public static final int MESSAGES_PER_ITTERATION = 10;
    public static final int WORKER_COUNT = 10;
 
    private static final Logger LOG = LoggerFactory.getLogger(ReconnectTest.class);
 
-   private BrokerService bs;
+   private EmbeddedJMS bs;
    private URI tcpUri;
    private final AtomicInteger resumedCount = new AtomicInteger();
    private final AtomicInteger interruptedCount = new AtomicInteger();
@@ -102,7 +106,7 @@ public class ReconnectTest extends TestCase {
       }
 
       public void start() {
-         new Thread(this).start();
+         new Thread(this, name).start();
       }
 
       public void stop() {
@@ -129,13 +133,19 @@ public class ReconnectTest extends TestCase {
             MessageConsumer consumer = session.createConsumer(queue);
             MessageProducer producer = session.createProducer(queue);
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
             while (!stop.get()) {
+
                for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
-                  producer.send(session.createTextMessage("TEST:" + i));
+                  TextMessage text = session.createTextMessage(name + " TEST:" + i);
+                  text.setStringProperty("myprop", name + " TEST:" + i);
+                  producer.send(text);
                }
+
                for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
-                  consumer.receive();
+                  TextMessage m = (TextMessage) consumer.receive();
                }
+
                iterations.incrementAndGet();
             }
             session.close();
@@ -159,11 +169,12 @@ public class ReconnectTest extends TestCase {
       public synchronized void assertNoErrors() {
          if (error != null) {
             error.printStackTrace();
-            fail("Worker " + name + " got Exception: " + error);
+            Assert.fail("Worker " + name + " got Exception: " + error);
          }
       }
    }
 
+   @Test
    public void testReconnects() throws Exception {
 
       for (int k = 1; k < 10; k++) {
@@ -181,7 +192,7 @@ public class ReconnectTest extends TestCase {
                LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration.");
                Thread.sleep(1000);
             }
-            assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
+            Assert.assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
             workers[i].assertNoErrors();
          }
 
@@ -192,7 +203,7 @@ public class ReconnectTest extends TestCase {
             workers[i].failConnection();
          }
 
-         assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
+         Assert.assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get());
@@ -201,7 +212,7 @@ public class ReconnectTest extends TestCase {
          }, TimeUnit.SECONDS.toMillis(60)));
 
          // Wait for the connections to re-establish...
-         assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
+         Assert.assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get());
@@ -220,26 +231,25 @@ public class ReconnectTest extends TestCase {
       }
    }
 
-   @Override
-   protected void setUp() throws Exception {
-      bs = new BrokerService();
-      bs.setPersistent(false);
-      bs.setUseJmx(true);
-      TransportConnector connector = bs.addConnector("tcp://localhost:0");
+   @Before
+   public void setUp() throws Exception {
+      Configuration config = createConfig(0);
+      bs = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
       bs.start();
-      tcpUri = connector.getConnectUri();
+      tcpUri = new URI(newURI(0));
+
       workers = new Worker[WORKER_COUNT];
       for (int i = 0; i < WORKER_COUNT; i++) {
-         workers[i] = new Worker("" + i);
+         workers[i] = new Worker("worker-" + i);
          workers[i].start();
       }
    }
 
-   @Override
-   protected void tearDown() throws Exception {
+   @After
+   public void tearDown() throws Exception {
       for (int i = 0; i < WORKER_COUNT; i++) {
          workers[i].stop();
       }
-      new ServiceStopper().stop(bs);
+      bs.stop();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
index 3a55473..ed6040d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
@@ -24,15 +24,16 @@ import java.util.concurrent.CountDownLatch;
 import javax.jms.Connection;
 import javax.net.ServerSocketFactory;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class SlowConnectionTest extends TestCase {
+public class SlowConnectionTest {
 
    private CountDownLatch socketReadyLatch = new CountDownLatch(1);
 
+   @Test
    public void testSlowConnection() throws Exception {
 
       MockBroker broker = new MockBroker();
@@ -57,7 +58,7 @@ public class SlowConnectionTest extends TestCase {
       }).start();
 
       int count = 0;
-      assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
+      Assert.assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisified() throws Exception {
             int count = 0;


Mime
View raw message