activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [10/48] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Wed, 24 Feb 2016 18:53:09 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad1b8efd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
index 2779f52..75c27d7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
@@ -22,6 +22,8 @@ import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -37,97 +39,105 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQMessageTransformation;
 import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.util.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.After;
 import org.junit.Test;
 
 // see https://issues.apache.org/activemq/browse/AMQ-2573
-public class FailoverConsumerUnconsumedTest {
-
+@RunWith(BMUnitRunner.class)
+public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class);
    private static final String QUEUE_NAME = "FailoverWithUnconsumed";
-   private static final String TRANSPORT_URI = "tcp://localhost:0";
-   private String url;
+   private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+
+   private static int maxConsumers = 2;
+   private static AtomicInteger consumerCount = new AtomicInteger(0);
+   private static CountDownLatch brokerStopLatch = new CountDownLatch(1);
+   private static AtomicBoolean watchTopicAdvisories = new AtomicBoolean(false);
+
+   private String url = newURI(0);
    final int prefetch = 10;
-   BrokerService broker;
+   private static EmbeddedJMS broker;
 
    @After
-   public void stopBroker() throws Exception {
+   public void tearDown() throws Exception {
       if (broker != null) {
          broker.stop();
+         broker = null;
       }
    }
 
-   public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      broker = createBroker(deleteAllMessagesOnStartup);
-      broker.start();
-   }
-
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
-   }
-
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      broker = new BrokerService();
-      broker.addConnector(bindAddress);
-      broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
-      this.url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
-      return broker;
+   @Before
+   public void setUp() throws Exception {
+      consumerCount.set(0);
    }
 
    @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processAddConsumer",
+                           targetLocation = "ENTRY",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker2(context)")
+           }
+   )
    public void testFailoverConsumerDups() throws Exception {
+      watchTopicAdvisories.set(true);
       doTestFailoverConsumerDups(true);
    }
 
    @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processAddConsumer",
+                           targetLocation = "ENTRY",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker2(context)")
+           }
+   )
    public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception {
+      watchTopicAdvisories.set(false);
       doTestFailoverConsumerDups(false);
    }
 
    @SuppressWarnings("unchecked")
    @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processAddConsumer",
+                           targetLocation = "ENTRY",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker(context)")
+           }
+   )
    public void testFailoverClientAckMissingRedelivery() throws Exception {
-
-      final int maxConsumers = 2;
-      broker = createBroker(true);
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         int consumerCount;
-
-         // broker is killed on x create consumer
-         @Override
-         public Subscription addConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
-            if (++consumerCount == maxConsumers) {
-               context.setDontSendReponse(true);
-               Executors.newSingleThreadExecutor().execute(new Runnable() {
-                  @Override
-                  public void run() {
-                     LOG.info("Stopping broker on consumer: " + info.getConsumerId());
-                     try {
-                        broker.stop();
-                     }
-                     catch (Exception e) {
-                        e.printStackTrace();
-                     }
-                  }
-               });
-            }
-            return super.addConsumer(context, info);
-         }
-      }});
+      maxConsumers = 2;
+      brokerStopLatch = new CountDownLatch(1);
+      broker = createBroker();
       broker.start();
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -139,7 +149,9 @@ public class FailoverConsumerUnconsumedTest {
       final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
       final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch);
 
-      final Vector<TestConsumer> testConsumers = new Vector<>();
+      doByteman.set(true);
+
+      final Vector<TestConsumer> testConsumers = new Vector<TestConsumer>();
       TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection);
       testConsumer.setMessageListener(new MessageListener() {
          @Override
@@ -157,7 +169,6 @@ public class FailoverConsumerUnconsumedTest {
       produceMessage(consumerSession, destination, maxConsumers * prefetch);
 
       assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() {
-         @Override
          public boolean isSatisified() throws Exception {
             int totalDelivered = 0;
             for (TestConsumer testConsumer : testConsumers) {
@@ -172,7 +183,6 @@ public class FailoverConsumerUnconsumedTest {
       final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
 
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             try {
                LOG.info("add last consumer...");
@@ -198,17 +208,16 @@ public class FailoverConsumerUnconsumedTest {
          }
       });
 
-      // will be stopped by the plugin
-      broker.waitUntilStopped();
+      brokerStopLatch.await();
+      doByteman.set(false);
 
-      broker = createBroker(false, this.url);
+      broker = createBroker();
       broker.start();
 
       assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
 
       // each should again get prefetch messages - all unacked deliveries should be rolledback
       assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
-         @Override
          public boolean isSatisified() throws Exception {
             int totalDelivered = 0;
             for (TestConsumer testConsumer : testConsumers) {
@@ -220,55 +229,19 @@ public class FailoverConsumerUnconsumedTest {
          }
       }));
 
-      assertTrue("after restart each got prefetch amount", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            for (TestConsumer testConsumer : testConsumers) {
-               long delivered = testConsumer.deliveredSize();
-               LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
-               if (delivered != prefetch) {
-                  return false;
-               }
-            }
-            return true;
-         }
-      }));
-
       connection.close();
    }
 
    @SuppressWarnings("unchecked")
    public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
 
-      final int maxConsumers = 4;
-      broker = createBroker(true);
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         int consumerCount;
-
-         // broker is killed on x create consumer
-         @Override
-         public Subscription addConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
-            if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1 : 0)) {
-               context.setDontSendReponse(true);
-               Executors.newSingleThreadExecutor().execute(new Runnable() {
-                  @Override
-                  public void run() {
-                     LOG.info("Stopping broker on consumer: " + info.getConsumerId());
-                     try {
-                        broker.stop();
-                     }
-                     catch (Exception e) {
-                        e.printStackTrace();
-                     }
-                  }
-               });
-            }
-            return super.addConsumer(context, info);
-         }
-      }});
+      maxConsumers = 4;
+      broker = createBroker();
       broker.start();
 
+      brokerStopLatch = new CountDownLatch(1);
+      doByteman.set(true);
+
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       cf.setWatchTopicAdvisories(watchTopicAdvisories);
 
@@ -283,10 +256,11 @@ public class FailoverConsumerUnconsumedTest {
          testConsumers.add(new TestConsumer(consumerSession, destination, connection));
       }
 
+      assureQueueMessages(0, new SimpleString("jms.queue." + QUEUE_NAME));
+
       produceMessage(consumerSession, destination, maxConsumers * prefetch);
 
       assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() {
-         @Override
          public boolean isSatisified() throws Exception {
             int totalUnconsumed = 0;
             for (TestConsumer testConsumer : testConsumers) {
@@ -301,7 +275,6 @@ public class FailoverConsumerUnconsumedTest {
       final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
 
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             try {
                LOG.info("add last consumer...");
@@ -315,12 +288,8 @@ public class FailoverConsumerUnconsumedTest {
          }
       });
 
-      // will be stopped by the plugin
-      broker.waitUntilStopped();
-
       // verify interrupt
       assertTrue("add messages dispatched and unconsumed are cleaned up", Wait.waitFor(new Wait.Condition() {
-         @Override
          public boolean isSatisified() throws Exception {
             int totalUnconsumed = 0;
             for (TestConsumer testConsumer : testConsumers) {
@@ -332,14 +301,16 @@ public class FailoverConsumerUnconsumedTest {
          }
       }));
 
-      broker = createBroker(false, this.url);
+      brokerStopLatch.await();
+      doByteman.set(false);
+
+      broker = createBroker();
       broker.start();
 
       assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
 
       // each should again get prefetch messages - all unconsumed deliveries should be rolledback
       assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
-         @Override
          public boolean isSatisified() throws Exception {
             int totalUnconsumed = 0;
             for (TestConsumer testConsumer : testConsumers) {
@@ -354,6 +325,11 @@ public class FailoverConsumerUnconsumedTest {
       connection.close();
    }
 
+   private void assureQueueMessages(int num, SimpleString queueName) {
+      QueueImpl queue = (QueueImpl) broker.getActiveMQServer().getPostOffice().getBinding(queueName).getBindable();
+      Assert.assertEquals(num, queue.getMessageCount());
+   }
+
    private void produceMessage(final Session producerSession, Queue destination, long count) throws JMSException {
       MessageProducer producer = producerSession.createProducer(destination);
       for (int i = 0; i < count; i++) {
@@ -385,4 +361,44 @@ public class FailoverConsumerUnconsumedTest {
       idGen -= 5;
       return idGen;
    }
+
+   public static void holdResponseAndStopBroker(AMQConnectionContext context) {
+      if (doByteman.get()) {
+         if (consumerCount.incrementAndGet() == maxConsumers) {
+            context.setDontSendReponse(true);
+            Executors.newSingleThreadExecutor().execute(new Runnable() {
+               public void run() {
+                  try {
+                     broker.stop();
+                     brokerStopLatch.countDown();
+                  }
+                  catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            });
+         }
+      }
+   }
+
+   public static void holdResponseAndStopBroker2(AMQConnectionContext context) {
+      if (doByteman.get()) {
+         if (consumerCount.incrementAndGet() == maxConsumers + (watchTopicAdvisories.get() ? 1 : 0)) {
+            context.setDontSendReponse(true);
+            Executors.newSingleThreadExecutor().execute(new Runnable() {
+               public void run() {
+                  try {
+                     broker.stop();
+                     Assert.assertEquals(1, brokerStopLatch.getCount());
+                     brokerStopLatch.countDown();
+                  }
+                  catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            });
+         }
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad1b8efd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
index cb15940..e801b3c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
@@ -33,25 +33,35 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.util.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FailoverDuplicateTest extends TestSupport {
+@RunWith(BMUnitRunner.class)
+public class FailoverDuplicateTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailoverDuplicateTest.class);
    private static final String QUEUE_NAME = "TestQueue";
-   private static final String TRANSPORT_URI = "tcp://localhost:0";
-   private String url;
-   BrokerService broker;
 
-   @Override
+   private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+   private static final AtomicBoolean first = new AtomicBoolean(false);
+   private static final CountDownLatch gotMessageLatch = new CountDownLatch(1);
+   private static final CountDownLatch producersDone = new CountDownLatch(1);
+
+   private String url = newURI(0);
+   EmbeddedJMS broker;
+
+   @After
    public void tearDown() throws Exception {
       stopBroker();
    }
@@ -63,72 +73,38 @@ public class FailoverDuplicateTest extends TestSupport {
    }
 
    public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      broker = createBroker(deleteAllMessagesOnStartup);
+      broker = createBroker();
       broker.start();
    }
 
-   public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
+   public void startBroker() throws Exception {
+      broker = createBroker();
       broker.start();
    }
 
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
-   }
-
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      broker = new BrokerService();
-      broker.setUseJmx(false);
-      broker.setAdvisorySupport(false);
-      broker.addConnector(bindAddress);
-      broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
-      url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
-      return broker;
-   }
-
    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
       factory.setAuditMaximumProducerNumber(2048);
       factory.setOptimizeAcknowledge(true);
    }
 
    @SuppressWarnings("unchecked")
+   @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                           targetMethod = "processMessage",
+                           targetLocation = "EXIT",
+                           binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                           action = "org.apache.activemq.transport.failover.FailoverDuplicateTest.holdResponseAndStopConn(context)")
+           }
+   )
    public void testFailoverSendReplyLost() throws Exception {
 
-      broker = createBroker(true);
-      setDefaultPersistenceAdapter(broker);
-
-      final CountDownLatch gotMessageLatch = new CountDownLatch(1);
-      final CountDownLatch producersDone = new CountDownLatch(1);
-      final AtomicBoolean first = new AtomicBoolean(false);
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         @Override
-         public void send(final ProducerBrokerExchange producerExchange,
-                          org.apache.activemq.command.Message messageSend) throws Exception {
-            // so send will hang as if reply is lost
-            super.send(producerExchange, messageSend);
-            if (first.compareAndSet(false, true)) {
-               producerExchange.getConnectionContext().setDontSendReponse(true);
-               Executors.newSingleThreadExecutor().execute(new Runnable() {
-                  @Override
-                  public void run() {
-                     try {
-                        LOG.info("Waiting for recepit");
-                        assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
-                        assertTrue("new producers done on time", producersDone.await(120, TimeUnit.SECONDS));
-                        LOG.info("Stopping connection post send and receive and multiple producers");
-                        producerExchange.getConnectionContext().getConnection().stop();
-                     }
-                     catch (Exception e) {
-                        e.printStackTrace();
-                     }
-                  }
-               });
-            }
-         }
-      }});
+      broker = createBroker();
       broker.start();
+      doByteman.set(true);
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
       configureConnectionFactory(cf);
@@ -164,14 +140,14 @@ public class FailoverDuplicateTest extends TestSupport {
             }
             catch (JMSException e) {
                LOG.error("got send exception: ", e);
-               fail("got unexpected send exception" + e);
+               Assert.fail("got unexpected send exception" + e);
             }
             sendDoneLatch.countDown();
             LOG.info("done async send");
          }
       });
 
-      assertTrue("one message got through on time", gotMessageLatch.await(20, TimeUnit.SECONDS));
+      Assert.assertTrue("one message got through on time", gotMessageLatch.await(20, TimeUnit.SECONDS));
       // send more messages, blow producer audit
       final int numProducers = 1050;
       final int numPerProducer = 2;
@@ -186,7 +162,7 @@ public class FailoverDuplicateTest extends TestSupport {
          }
       }
 
-      assertTrue("message sent complete through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+      Assert.assertTrue("message sent complete through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
 
       Wait.waitFor(new Wait.Condition() {
          @Override
@@ -195,29 +171,16 @@ public class FailoverDuplicateTest extends TestSupport {
             return totalSent <= receivedCount.get();
          }
       });
-      assertEquals("we got all produced messages", totalSent, receivedCount.get());
+      Assert.assertEquals("we got all produced messages", totalSent, receivedCount.get());
       sendConnection.close();
       receiveConnection.close();
 
-      // verify stats
-      assertEquals("expect all messages are dequeued with one duplicate to dlq", totalSent + 2, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            LOG.info("dequeues : " + ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
-            return totalSent + 1 <= ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount();
-         }
-      });
-      assertEquals("dequeue correct, including duplicate dispatch poisoned", totalSent + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
-
       // ensure no dangling messages with fresh broker etc
       broker.stop();
-      broker.waitUntilStopped();
+      doByteman.set(false);
 
       LOG.info("Checking for remaining/hung messages with second restart..");
-      broker = createBroker(false, url);
-      setDefaultPersistenceAdapter(broker);
+      broker = createBroker();
       broker.start();
 
       // after restart, ensure no dangling messages
@@ -231,7 +194,7 @@ public class FailoverDuplicateTest extends TestSupport {
       if (msg == null) {
          msg = consumer.receive(5000);
       }
-      assertNull("no messges left dangling but got: " + msg, msg);
+      Assert.assertNull("no messges left dangling but got: " + msg, msg);
 
       sendConnection.close();
    }
@@ -247,4 +210,28 @@ public class FailoverDuplicateTest extends TestSupport {
       }
       producer.close();
    }
+
+   public static void holdResponseAndStopConn(final AMQConnectionContext context) {
+      if (doByteman.get()) {
+         if (first.compareAndSet(false, true)) {
+            context.setDontSendReponse(true);
+            Executors.newSingleThreadExecutor().execute(new Runnable() {
+               @Override
+               public void run() {
+                  try {
+                     LOG.info("Waiting for recepit");
+                     Assert.assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
+                     Assert.assertTrue("new producers done on time", producersDone.await(120, TimeUnit.SECONDS));
+                     LOG.info("Stopping connection post send and receive and multiple producers");
+                     context.getConnection().fail(null, "test Failoverduplicatetest");
+                  }
+                  catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            });
+         }
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad1b8efd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
index 57899ba..fcb60e5 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,6 @@
  */
 package org.apache.activemq.transport.failover;
 
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -27,31 +23,42 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.Response;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertTrue;
 
 // see: https://issues.apache.org/activemq/browse/AMQ-2877
-public class FailoverPrefetchZeroTest {
+@RunWith(BMUnitRunner.class)
+public class FailoverPrefetchZeroTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailoverPrefetchZeroTest.class);
    private static final String QUEUE_NAME = "FailoverPrefetchZero";
-   private static final String TRANSPORT_URI = "tcp://localhost:0";
-   private String url;
+
+   private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+   private static final CountDownLatch pullDone = new CountDownLatch(1);
+   private static CountDownLatch brokerStopLatch = new CountDownLatch(1);
+
+   private String url = newURI(0);
    final int prefetch = 0;
-   BrokerService broker;
+   private static EmbeddedJMS broker;
 
    @After
    public void stopBroker() throws Exception {
@@ -60,52 +67,25 @@ public class FailoverPrefetchZeroTest {
       }
    }
 
-   public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      broker = createBroker(deleteAllMessagesOnStartup);
+   public void startBroker() throws Exception {
+      broker = createBroker();
       broker.start();
    }
 
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
-   }
-
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      broker = new BrokerService();
-      broker.addConnector(bindAddress);
-      broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
-      url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
-      return broker;
-   }
-
    @SuppressWarnings("unchecked")
    @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "set no return response and stop the broker",
+         targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+         targetMethod = "processMessagePull",
+         targetLocation = "ENTRY",
+         binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+         action = "org.apache.activemq.transport.failover.FailoverPrefetchZeroTest.holdResponseAndStopBroker(context)")})
    public void testPrefetchZeroConsumerThroughRestart() throws Exception {
-      broker = createBroker(true);
-
-      final CountDownLatch pullDone = new CountDownLatch(1);
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         @Override
-         public Response messagePull(ConnectionContext context, final MessagePull pull) throws Exception {
-            context.setDontSendReponse(true);
-            pullDone.countDown();
-            Executors.newSingleThreadExecutor().execute(new Runnable() {
-               @Override
-               public void run() {
-                  LOG.info("Stopping broker on pull: " + pull);
-                  try {
-                     broker.stop();
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-            return null;
-         }
-      }});
+      broker = createBroker();
       broker.start();
+      doByteman.set(true);
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       cf.setWatchTopicAdvisories(false);
@@ -122,7 +102,6 @@ public class FailoverPrefetchZeroTest {
       final CountDownLatch receiveDone = new CountDownLatch(1);
       final Vector<Message> received = new Vector<>();
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             try {
                LOG.info("receive one...");
@@ -141,8 +120,9 @@ public class FailoverPrefetchZeroTest {
 
       // will be stopped by the plugin
       assertTrue("pull completed on broker", pullDone.await(30, TimeUnit.SECONDS));
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
+      brokerStopLatch.await();
+      doByteman.set(false);
+      broker = createBroker();
       broker.start();
 
       assertTrue("receive completed through failover", receiveDone.await(30, TimeUnit.SECONDS));
@@ -160,4 +140,25 @@ public class FailoverPrefetchZeroTest {
       }
       producer.close();
    }
+
+   public static void holdResponseAndStopBroker(final AMQConnectionContext context) {
+      if (doByteman.get()) {
+         context.setDontSendReponse(true);
+         pullDone.countDown();
+         Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+               try {
+                  broker.stop();
+               }
+               catch (Exception e) {
+                  e.printStackTrace();
+               }
+               finally {
+                  brokerStopLatch.countDown();
+               }
+            }
+         });
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad1b8efd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
index b8860a7..6e559e7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
@@ -16,58 +16,106 @@
  */
 package org.apache.activemq.transport.failover;
 
+import java.util.ArrayList;
 import java.util.HashMap;
-
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FailoverPriorityTest extends FailoverClusterTestSupport {
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+public class FailoverPriorityTest extends OpenwireArtemisBaseTest {
 
    protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
    private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
    private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
    private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
-   private final HashMap<String, String> urls = new HashMap<>();
+   private final HashMap<Integer, String> urls = new HashMap<>();
+
+   private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+   private EmbeddedJMS[] servers = new EmbeddedJMS[3];
+   private String clientUrl;
 
-   @Override
+   @Before
    public void setUp() throws Exception {
-      super.setUp();
-      urls.put(BROKER_A_NAME, BROKER_A_CLIENT_TC_ADDRESS);
-      urls.put(BROKER_B_NAME, BROKER_B_CLIENT_TC_ADDRESS);
+      urls.put(0, BROKER_A_CLIENT_TC_ADDRESS);
+      urls.put(1, BROKER_B_CLIENT_TC_ADDRESS);
    }
 
-   private static final String BROKER_A_NAME = "BROKERA";
-   private static final String BROKER_B_NAME = "BROKERB";
-   private static final String BROKER_C_NAME = "BROKERC";
+   @After
+   public void tearDown() throws Exception {
+      shutdownClients();
+      for (EmbeddedJMS server : servers) {
+         if (server != null) {
+            server.stop();
+         }
+      }
+   }
 
+   @Test
    public void testPriorityBackup() throws Exception {
-      createBrokerA();
-      createBrokerB();
-      getBroker(BROKER_B_NAME).waitUntilStarted();
+      Configuration config0 = createConfig("127.0.0.1", 0);
+      Configuration config1 = createConfig("127.0.0.1", 1);
+
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
+
+      servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[0].start();
+      servers[1].start();
+
+      Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
       Thread.sleep(1000);
 
       setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
       createClients(5);
 
-      assertAllConnectedTo(urls.get(BROKER_A_NAME));
+      assertAllConnectedTo(urls.get(0));
 
-      restart(false, BROKER_A_NAME, BROKER_B_NAME);
+      restart(false, 0, 1);
 
       for (int i = 0; i < 3; i++) {
-         restart(true, BROKER_A_NAME, BROKER_B_NAME);
+         restart(true, 0, 1);
       }
 
       Thread.sleep(5000);
 
-      restart(false, BROKER_A_NAME, BROKER_B_NAME);
+      restart(false, 0, 1);
 
    }
 
+   @Test
    public void testPriorityBackupList() throws Exception {
-      createBrokerA();
-      createBrokerB();
-      getBroker(BROKER_B_NAME).waitUntilStarted();
+      Configuration config0 = createConfig("127.0.0.1", 0);
+      Configuration config1 = createConfig("127.0.0.1", 1);
+
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
+
+      servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[0].start();
+      servers[1].start();
+
+      Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
       Thread.sleep(1000);
 
       setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&priorityURIs=tcp://127.0.0.1:61617&initialReconnectDelay=1000&useExponentialBackOff=false");
@@ -75,154 +123,166 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
 
       Thread.sleep(3000);
 
-      assertAllConnectedTo(urls.get(BROKER_B_NAME));
+      assertAllConnectedTo(urls.get(1));
 
-      restart(false, BROKER_B_NAME, BROKER_A_NAME);
+      restart(false, 1, 0);
 
       for (int i = 0; i < 3; i++) {
-         restart(true, BROKER_B_NAME, BROKER_A_NAME);
+         restart(true, 1, 0);
       }
 
-      restart(false, BROKER_B_NAME, BROKER_A_NAME);
-
+      restart(false, 1, 0);
    }
 
+   @Test
    public void testThreeBrokers() throws Exception {
-      // Broker A
-      addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-      addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
-      addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      getBroker(BROKER_A_NAME).start();
-
-      // Broker B
-      addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-      addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
-      addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      getBroker(BROKER_B_NAME).start();
-
-      // Broker C
-      addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
-      addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS, false);
-      addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      getBroker(BROKER_C_NAME).start();
-
-      getBroker(BROKER_C_NAME).waitUntilStarted();
+      commonSetup();
       Thread.sleep(1000);
 
       setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
 
       createClients(5);
 
-      assertAllConnectedTo(urls.get(BROKER_A_NAME));
-
-      restart(true, BROKER_A_NAME, BROKER_B_NAME);
+      assertAllConnectedTo(urls.get(0));
 
+      restart(true, 0, 1, 3);
    }
 
+   @Test
    public void testPriorityBackupAndUpdateClients() throws Exception {
-      // Broker A
-      addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-      addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, true);
-      addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      getBroker(BROKER_A_NAME).start();
-
-      // Broker B
-      addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-      addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, true);
-      addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      getBroker(BROKER_B_NAME).start();
-
-      getBroker(BROKER_B_NAME).waitUntilStarted();
+      Configuration config0 = createConfig("127.0.0.1", 0);
+      Configuration config1 = createConfig("127.0.0.1", 1);
+
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
+
+      servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[0].start();
+      servers[1].start();
+
+      Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
       Thread.sleep(1000);
 
       setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
 
-      LOG.info("Client URI will be: " + getClientUrl());
-
       createClients(5);
 
       // Let's wait a little bit longer just in case it takes a while to realize that the
       // Broker A is the one with higher priority.
       Thread.sleep(5000);
 
-      assertAllConnectedTo(urls.get(BROKER_A_NAME));
+      assertAllConnectedTo(urls.get(0));
    }
 
-   private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
+   private void restart(boolean primary, int primaryID, int secondaryID) throws Exception {
+      restart(primary, primaryID, secondaryID, 2);
+   }
+
+   private void restart(boolean primary, int primaryID, int secondaryID, int total) throws Exception {
 
       Thread.sleep(1000);
 
       if (primary) {
-         LOG.info("Stopping " + primaryName);
-         stopBroker(primaryName);
+         LOG.info("Stopping " + primaryID);
+         stopBroker(primaryID);
+         Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total - 1));
       }
       else {
-         LOG.info("Stopping " + secondaryName);
-         stopBroker(secondaryName);
+         LOG.info("Stopping " + secondaryID);
+         stopBroker(secondaryID);
+         Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total - 1));
       }
       Thread.sleep(5000);
 
       if (primary) {
-         assertAllConnectedTo(urls.get(secondaryName));
+         assertAllConnectedTo(urls.get(secondaryID));
       }
       else {
-         assertAllConnectedTo(urls.get(primaryName));
+         assertAllConnectedTo(urls.get(primaryID));
       }
 
       if (primary) {
-         LOG.info("Starting " + primaryName);
-         createBrokerByName(primaryName);
-         getBroker(primaryName).waitUntilStarted();
+         Configuration config = createConfig("127.0.0.1", primaryID);
+
+         deployClusterConfiguration(config, secondaryID);
+
+         servers[primaryID] = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+         servers[primaryID].start();
+
+         Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
+         Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
       }
       else {
-         LOG.info("Starting " + secondaryName);
-         createBrokerByName(secondaryName);
-         getBroker(secondaryName).waitUntilStarted();
+         Configuration config = createConfig("127.0.0.1", secondaryID);
+
+         deployClusterConfiguration(config, primaryID);
+
+         servers[secondaryID] = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+         servers[secondaryID].start();
+
+         Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
+         Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
       }
 
       Thread.sleep(5000);
 
-      assertAllConnectedTo(urls.get(primaryName));
+      assertAllConnectedTo(urls.get(primaryID));
 
    }
 
-   private void createBrokerByName(String name) throws Exception {
-      if (name.equals(BROKER_A_NAME)) {
-         createBrokerA();
-      }
-      else if (name.equals(BROKER_B_NAME)) {
-         createBrokerB();
-      }
-      else {
-         throw new Exception("Unknown broker " + name);
+   private void stopBroker(int serverID) throws Exception {
+      servers[serverID].stop();
+   }
+
+   public void setClientUrl(String clientUrl) {
+      this.clientUrl = clientUrl;
+   }
+
+   protected void createClients(int numOfClients) throws Exception {
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+      for (int i = 0; i < numOfClients; i++) {
+         ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+         c.start();
+         connections.add(c);
       }
    }
 
-   private void createBrokerA() throws Exception {
-      if (getBroker(BROKER_A_NAME) == null) {
-         addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-         addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
-         addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-         getBroker(BROKER_A_NAME).start();
+   protected void shutdownClients() throws JMSException {
+      for (Connection c : connections) {
+         c.close();
       }
    }
 
-   private void createBrokerB() throws Exception {
-      if (getBroker(BROKER_B_NAME) == null) {
-         addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-         addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
-         addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-         getBroker(BROKER_B_NAME).start();
+   protected void assertAllConnectedTo(String url) throws Exception {
+      for (ActiveMQConnection c : connections) {
+         Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
       }
    }
 
-   @Override
-   protected void tearDown() throws Exception {
-      shutdownClients();
-      destroyBrokerCluster();
+   //default setup for most tests
+   private void commonSetup() throws Exception {
+      Configuration config0 = createConfig("127.0.0.1", 0);
+      Configuration config1 = createConfig("127.0.0.1", 1);
+      Configuration config2 = createConfig("127.0.0.1", 2);
+
+      deployClusterConfiguration(config0, 1, 2);
+      deployClusterConfiguration(config1, 0, 2);
+      deployClusterConfiguration(config2, 0, 1);
+
+      servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[2] = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
+
+      servers[0].start();
+      servers[1].start();
+      servers[2].start();
+
+      Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+      Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+      Assert.assertTrue(servers[2].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad1b8efd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
index 54dd3e3..80f83db 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
@@ -17,47 +17,59 @@
 
 package org.apache.activemq.transport.failover;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
 
-public class FailoverRandomTest extends TestCase {
+public class FailoverRandomTest extends OpenwireArtemisBaseTest {
 
-   BrokerService brokerA, brokerB;
+   private EmbeddedJMS server0, server1;
 
-   @Override
+   @Before
    public void setUp() throws Exception {
-      brokerA = createBroker("A");
-      brokerB = createBroker("B");
-   }
+      Configuration config0 = createConfig(0);
+      Configuration config1 = createConfig(1);
 
-   @Override
-   public void tearDown() throws Exception {
-      brokerA.stop();
-      brokerB.stop();
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
+
+      server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+
+      server0.start();
+      server1.start();
+
+      server0.getActiveMQServer().setIdentity("BrokerA");
+      server1.getActiveMQServer().setIdentity("BrokerB");
+
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
    }
 
-   private BrokerService createBroker(String name) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setBrokerName("Broker" + name);
-      broker.addConnector("tcp://localhost:0");
-      broker.getManagementContext().setCreateConnector(false);
-      broker.setPersistent(false);
-      broker.setUseJmx(false);
-      broker.start();
-      return broker;
+   @After
+   public void tearDown() throws Exception {
+      server0.stop();
+      server1.stop();
    }
 
+   @Test
    public void testRandomConnections() throws Exception {
-      String failoverUrl = "failover:(" + brokerA.getTransportConnectors().get(0).getConnectUri() + "," + brokerB.getTransportConnectors().get(0).getConnectUri() + ")";
+      String failoverUrl = "failover:(" + newURI(0) + "," + newURI(1) + ")";
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUrl);
 
       ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
       connection.start();
       String brokerName1 = connection.getBrokerName();
-      assertNotNull(brokerName1);
+      Assert.assertNotNull(brokerName1);
       connection.close();
 
       String brokerName2 = brokerName1;
@@ -66,9 +78,9 @@ public class FailoverRandomTest extends TestCase {
          connection = (ActiveMQConnection) cf.createConnection();
          connection.start();
          brokerName2 = connection.getBrokerName();
-         assertNotNull(brokerName2);
+         Assert.assertNotNull(brokerName2);
          connection.close();
       }
-      assertTrue(brokerName1 + "!=" + brokerName2, !brokerName1.equals(brokerName2));
+      Assert.assertTrue(brokerName1 + "!=" + brokerName2, !brokerName1.equals(brokerName2));
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad1b8efd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
index 6b7a2bb..3be2593 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
@@ -16,19 +16,15 @@
  */
 package org.apache.activemq.transport.failover;
 
-import junit.framework.Test;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.Test;
 
 public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
 
-   public static Test suite() {
-      return suite(FailoverRedeliveryTransactionTest.class);
-   }
-
    @Override
    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
       super.configureConnectionFactory(factory);
@@ -36,26 +32,24 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
    }
 
    @Override
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress);
+   public EmbeddedJMS createBroker() throws Exception {
+      EmbeddedJMS brokerService = super.createBroker();
       PolicyMap policyMap = new PolicyMap();
       PolicyEntry defaultEntry = new PolicyEntry();
       defaultEntry.setPersistJMSRedelivered(true);
       policyMap.setDefaultEntry(defaultEntry);
-      brokerService.setDestinationPolicy(policyMap);
+      //revisit: do we support sth like persistJMSRedelivered?
+      //brokerService.setDestinationPolicy(policyMap);
       return brokerService;
    }
 
    // no point rerunning these
    @Override
+   @Test
    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
    }
 
    @Override
-   public void initCombosForTestFailoverCommitReplyLost() {
-   }
-
-   @Override
    public void testFailoverCommitReplyLost() throws Exception {
    }
 
@@ -64,18 +58,10 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
    }
 
    @Override
-   public void initCombosForTestFailoverSendReplyLost() {
-   }
-
-   @Override
    public void testFailoverSendReplyLost() throws Exception {
    }
 
    @Override
-   public void initCombosForTestFailoverConnectionSendReplyLost() {
-   }
-
-   @Override
    public void testFailoverConnectionSendReplyLost() throws Exception {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad1b8efd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index 07a8436..c5ee02f 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -30,40 +30,43 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FailoverTimeoutTest {
+public class FailoverTimeoutTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTimeoutTest.class);
 
    private static final String QUEUE_NAME = "test.failovertimeout";
-   BrokerService bs;
+   EmbeddedJMS server;
    URI tcpUri;
 
    @Before
    public void setUp() throws Exception {
-      bs = new BrokerService();
-      bs.setUseJmx(false);
-      bs.addConnector("tcp://localhost:0");
-      bs.start();
-      tcpUri = bs.getTransportConnectors().get(0).getConnectUri();
+      Configuration config = createConfig(0);
+      server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+      server.start();
+      tcpUri = new URI(newURI(0));
    }
 
    @After
    public void tearDown() throws Exception {
-      if (bs != null) {
-         bs.stop();
+      if (server != null) {
+         server.stop();
       }
    }
 
    @Test
    public void testTimoutDoesNotFailConnectionAttempts() throws Exception {
-      bs.stop();
+      server.stop();
       long timeout = 1000;
 
       long startTime = System.currentTimeMillis();
@@ -99,7 +102,7 @@ public class FailoverTimeoutTest {
       TextMessage message = session.createTextMessage("Test message");
       producer.send(message);
 
-      bs.stop();
+      server.stop();
 
       try {
          producer.send(message);
@@ -108,15 +111,14 @@ public class FailoverTimeoutTest {
          assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
       }
 
-      bs = new BrokerService();
-      bs.setUseJmx(false);
-      bs.addConnector(tcpUri);
-      bs.start();
-      bs.waitUntilStarted();
+      Configuration config = createConfig(0);
+      server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+      server.start();
 
       producer.send(message);
 
-      bs.stop();
+      server.stop();
+      server = null;
    }
 
    @Test


Mime
View raw message