activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [09/39] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Wed, 17 Feb 2016 02:03:44 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
index f80b09a..3d75905 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -34,22 +33,26 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
-
-import junit.framework.TestCase;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * TestCase showing the message-destroying described in AMQ-1925
  */
-public class AMQ1925Test extends TestCase implements ExceptionListener {
+public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionListener {
 
    private static final Logger log = Logger.getLogger(AMQ1925Test.class);
 
@@ -57,7 +60,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
    private static final String PROPERTY_MSG_NUMBER = "NUMBER";
    private static final int MESSAGE_COUNT = 10000;
 
-   private BrokerService bs;
+   private EmbeddedJMS bs;
    private URI tcpUri;
    private ActiveMQConnectionFactory cf;
 
@@ -74,17 +77,13 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
       final CountDownLatch starter = new CountDownLatch(1);
       final AtomicBoolean restarted = new AtomicBoolean();
       new Thread(new Runnable() {
-         @Override
          public void run() {
             try {
                starter.await();
 
                // Simulate broker failure & restart
                bs.stop();
-               bs = new BrokerService();
-               bs.setPersistent(true);
-               bs.setUseJmx(true);
-               bs.addConnector(tcpUri);
+               bs = createNewServer();
                bs.start();
 
                restarted.set(true);
@@ -97,21 +96,21 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
 
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          Message message = consumer.receive(500);
-         assertNotNull("No Message " + i + " found", message);
+         Assert.assertNotNull("No Message " + i + " found", message);
 
          if (i < 10)
-            assertFalse("Timing problem, restarted too soon", restarted.get());
+            Assert.assertFalse("Timing problem, restarted too soon", restarted.get());
          if (i == 10) {
             starter.countDown();
          }
          if (i > MESSAGE_COUNT - 100) {
-            assertTrue("Timing problem, restarted too late", restarted.get());
+            Assert.assertTrue("Timing problem, restarted too late", restarted.get());
          }
 
-         assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+         Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
          session.commit();
       }
-      assertNull(consumer.receive(500));
+      Assert.assertNull(consumer.receive(500));
 
       consumer.close();
       session.close();
@@ -133,17 +132,13 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
       final CountDownLatch starter = new CountDownLatch(1);
       final AtomicBoolean restarted = new AtomicBoolean();
       new Thread(new Runnable() {
-         @Override
          public void run() {
             try {
                starter.await();
 
                // Simulate broker failure & restart
                bs.stop();
-               bs = new BrokerService();
-               bs.setPersistent(true);
-               bs.setUseJmx(true);
-               bs.addConnector(tcpUri);
+               bs = createNewServer();
                bs.start();
 
                restarted.set(true);
@@ -172,12 +167,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
          }
 
          if (i < 10)
-            assertFalse("Timing problem, restarted too soon", restarted.get());
+            Assert.assertFalse("Timing problem, restarted too soon", restarted.get());
          if (i == 10) {
             starter.countDown();
          }
          if (i > MESSAGE_COUNT - 50) {
-            assertTrue("Timing problem, restarted too late", restarted.get());
+            Assert.assertTrue("Timing problem, restarted too late", restarted.get());
          }
 
          if (message1 != null) {
@@ -189,8 +184,8 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
             session2.commit();
          }
       }
-      assertNull(consumer1.receive(500));
-      assertNull(consumer2.receive(500));
+      Assert.assertNull(consumer1.receive(500));
+      Assert.assertNull(consumer2.receive(500));
 
       consumer1.close();
       session1.close();
@@ -203,7 +198,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
          foundMissingMessages = tryToFetchMissingMessages();
       }
       for (int i = 0; i < MESSAGE_COUNT; i++) {
-         assertTrue("Message-Nr " + i + " not found (" + results.size() + " total, " + foundMissingMessages + " have been found 'lingering' in the queue)", results.contains(i));
+         Assert.assertTrue("Message-Nr " + i + " not found (" + results.size() + " total, " + foundMissingMessages + " have been found 'lingering' in the queue)", results.contains(i));
       }
       assertQueueEmpty();
    }
@@ -231,6 +226,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
       return count;
    }
 
+   @Test
    public void testAMQ1925_TXBegin() throws Exception {
       Connection connection = cf.createConnection();
       connection.start();
@@ -241,20 +237,17 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
       boolean restartDone = false;
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          Message message = consumer.receive(5000);
-         assertNotNull(message);
+         Assert.assertNotNull(message);
 
          if (i == 222 && !restartDone) {
             // Simulate broker failure & restart
             bs.stop();
-            bs = new BrokerService();
-            bs.setPersistent(true);
-            bs.setUseJmx(true);
-            bs.addConnector(tcpUri);
+            bs = createNewServer();
             bs.start();
             restartDone = true;
          }
 
-         assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+         Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
          try {
             session.commit();
          }
@@ -263,16 +256,17 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
             i--;
          }
       }
-      assertNull(consumer.receive(500));
+      Assert.assertNull(consumer.receive(500));
 
       consumer.close();
       session.close();
       connection.close();
 
       assertQueueEmpty();
-      assertNull("no exception on connection listener: " + exception, exception);
+      Assert.assertNull("no exception on connection listener: " + exception, exception);
    }
 
+   @Test
    public void testAMQ1925_TXCommited() throws Exception {
       Connection connection = cf.createConnection();
       connection.start();
@@ -281,22 +275,19 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
 
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          Message message = consumer.receive(5000);
-         assertNotNull(message);
+         Assert.assertNotNull(message);
 
-         assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+         Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
          session.commit();
 
          if (i == 222) {
             // Simulate broker failure & restart
             bs.stop();
-            bs = new BrokerService();
-            bs.setPersistent(true);
-            bs.setUseJmx(true);
-            bs.addConnector(tcpUri);
+            bs = createNewServer();
             bs.start();
          }
       }
-      assertNull(consumer.receive(500));
+      Assert.assertNull(consumer.receive(500));
 
       consumer.close();
       session.close();
@@ -313,7 +304,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
 
       Message msg = consumer.receive(500);
       if (msg != null) {
-         fail(msg.toString());
+         Assert.fail(msg.toString());
       }
 
       consumer.close();
@@ -324,9 +315,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
    }
 
    private void assertQueueLength(int len) throws Exception, IOException {
-      Set<Destination> destinations = bs.getBroker().getDestinations(new ActiveMQQueue(QUEUE_NAME));
-      Queue queue = (Queue) destinations.iterator().next();
-      assertEquals(len, queue.getMessageStore().getMessageCount());
+      QueueImpl queue = (QueueImpl) bs.getActiveMQServer().getPostOffice().getBinding(new SimpleString("jms.queue." + QUEUE_NAME)).getBindable();
+      if (len > queue.getMessageCount()) {
+         //we wait for a moment as the tx might still in afterCommit stage (async op)
+         Thread.sleep(5000);
+      }
+      Assert.assertEquals(len, queue.getMessageCount());
    }
 
    private void sendMessagesToQueue() throws Exception {
@@ -349,30 +343,41 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
       assertQueueLength(MESSAGE_COUNT);
    }
 
-   @Override
-   protected void setUp() throws Exception {
+   @Before
+   public void setUp() throws Exception {
       exception = null;
-      bs = new BrokerService();
-      bs.setDeleteAllMessagesOnStartup(true);
-      bs.setPersistent(true);
-      bs.setUseJmx(true);
-      TransportConnector connector = bs.addConnector("tcp://localhost:0");
+      bs = createNewServer();
       bs.start();
-      tcpUri = connector.getConnectUri();
+      //auto created queue can't survive a restart, so we need this
+      bs.getJMSServerManager().createQueue(false, QUEUE_NAME, null, true, QUEUE_NAME);
+
+      tcpUri = new URI(newURI(0));
 
       cf = new ActiveMQConnectionFactory("failover://(" + tcpUri + ")");
 
       sendMessagesToQueue();
    }
 
-   @Override
-   protected void tearDown() throws Exception {
-      new ServiceStopper().stop(bs);
+   @After
+   public void tearDown() throws Exception {
+      try {
+         if (bs != null) {
+            bs.stop();
+            bs = null;
+         }
+      } catch (Exception e) {
+         log.error(e);
+      }
+
    }
 
-   @Override
    public void onException(JMSException exception) {
       this.exception = exception;
    }
 
+   private EmbeddedJMS createNewServer() throws Exception {
+      Configuration config = createConfig("localhost", 0);
+      EmbeddedJMS server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+      return server;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
deleted file mode 100644
index 8cac09a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.failover;
-
-import java.io.IOException;
-import java.net.URI;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class BadConnectionTest extends TestCase {
-
-   private static final Logger LOG = LoggerFactory.getLogger(BadConnectionTest.class);
-
-   protected Transport transport;
-
-   public void testConnectingToUnavailableServer() throws Exception {
-      try {
-         transport.asyncRequest(new ActiveMQMessage(), null);
-         fail("This should never succeed");
-      }
-      catch (IOException e) {
-         LOG.info("Caught expected exception: " + e, e);
-      }
-   }
-
-   protected Transport createTransport() throws Exception {
-      return TransportFactory.connect(new URI("failover://(tcp://doesNotExist:1234)?useExponentialBackOff=false&maxReconnectAttempts=3&initialReconnectDelay=100"));
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      transport = createTransport();
-      transport.setTransportListener(new TransportListener() {
-
-         @Override
-         public void onCommand(Object command) {
-         }
-
-         @Override
-         public void onException(IOException error) {
-         }
-
-         @Override
-         public void transportInterupted() {
-         }
-
-         @Override
-         public void transportResumed() {
-         }
-      });
-      transport.start();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      if (transport != null) {
-         transport.stop();
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java
new file mode 100644
index 0000000..42f199f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ClusterUtil.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.failover;
+
+/**
+ * Utilities to create broker clusters
+ */
+public class ClusterUtil {
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
index 110e2fc..f40ee4f 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
@@ -22,18 +22,19 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.jms.Connection;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.core.io.ClassPathResource;
 
 /**
  * Tests for AMQ-3719
  */
-public class ConnectionHangOnStartupTest {
+public class ConnectionHangOnStartupTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(ConnectionHangOnStartupTest.class);
 
@@ -41,13 +42,13 @@ public class ConnectionHangOnStartupTest {
    // maxReconnectDelay so that the test runs faster (because it will retry
    // connection sooner)
    protected String uriString = "failover://(tcp://localhost:62001?wireFormat.maxInactivityDurationInitalDelay=1,tcp://localhost:62002?wireFormat.maxInactivityDurationInitalDelay=1)?randomize=false&maxReconnectDelay=200";
-   protected BrokerService master = null;
-   protected AtomicReference<BrokerService> slave = new AtomicReference<>();
+   protected EmbeddedJMS master = null;
+   protected AtomicReference<EmbeddedJMS> slave = new AtomicReference<EmbeddedJMS>();
 
    @After
    public void tearDown() throws Exception {
 
-      BrokerService brokerService = slave.get();
+      EmbeddedJMS brokerService = slave.get();
       if (brokerService != null) {
          brokerService.stop();
       }
@@ -60,28 +61,18 @@ public class ConnectionHangOnStartupTest {
    }
 
    protected void createMaster() throws Exception {
-      BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
-      brokerFactory.afterPropertiesSet();
-      master = brokerFactory.getBroker();
+      Configuration config = createConfig("localhost", 0, 62001);
+      master = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
       master.start();
    }
 
    protected void createSlave() throws Exception {
-      BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
-      brokerFactory.afterPropertiesSet();
-      BrokerService broker = brokerFactory.getBroker();
+      Configuration config = createConfig("localhost", 1, 62002);
+      EmbeddedJMS broker = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
       broker.start();
       slave.set(broker);
    }
 
-   protected String getSlaveXml() {
-      return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
-   }
-
-   protected String getMasterXml() {
-      return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
-   }
-
    @Test(timeout = 60000)
    public void testInitialWireFormatNegotiationTimeout() throws Exception {
       final AtomicReference<Connection> conn = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
index cf1d43d..0875a61 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
@@ -22,58 +22,59 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.util.Wait;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Ensures connections aren't leaked when when we use backup=true and randomize=false
  */
-public class FailoverBackupLeakTest {
+public class FailoverBackupLeakTest extends OpenwireArtemisBaseTest {
+
+   private EmbeddedJMS s1, s2;
+
+   @Before
+   public void setUp() throws Exception {
 
-   private static BrokerService s1, s2;
+      Configuration config0 = createConfig("127.0.0.1", 0);
+      Configuration config1 = createConfig("127.0.0.1", 1);
 
-   @BeforeClass
-   public static void setUp() throws Exception {
-      s1 = buildBroker("broker1");
-      s2 = buildBroker("broker2");
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
 
+      s1 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      s2 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
       s1.start();
-      s1.waitUntilStarted();
       s2.start();
-      s2.waitUntilStarted();
+
+      Assert.assertTrue(s1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(s2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
    }
 
-   @AfterClass
-   public static void tearDown() throws Exception {
+   @After
+   public void tearDown() throws Exception {
       if (s2 != null) {
          s2.stop();
-         s2.waitUntilStopped();
       }
       if (s1 != null) {
          s1.stop();
-         s1.waitUntilStopped();
       }
    }
 
-   private static String getConnectString(BrokerService service) throws Exception {
-      return service.getTransportConnectors().get(0).getPublishableConnectString();
-   }
-
-   private static BrokerService buildBroker(String brokerName) throws Exception {
-      BrokerService service = new BrokerService();
-      service.setBrokerName(brokerName);
-      service.setUseJmx(false);
-      service.setPersistent(false);
-      service.setUseShutdownHook(false);
-      service.addConnector("tcp://0.0.0.0:0?transport.closeAsync=false");
-      return service;
-   }
-
    @Test
    public void backupNoRandomize() throws Exception {
       check("backup=true&randomize=false");
@@ -85,9 +86,12 @@ public class FailoverBackupLeakTest {
    }
 
    private void check(String connectionProperties) throws Exception {
-      String s1URL = getConnectString(s1), s2URL = getConnectString(s2);
+      String s1URL = newURI(0), s2URL = newURI(1);
       String uri = "failover://(" + s1URL + "," + s2URL + ")?" + connectionProperties;
       ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
+      final int initCount1 = getConnectionCount(s1);
+      final int initCount2 = getConnectionCount(s2);
+
       for (int i = 0; i < 10; i++) {
          buildConnection(factory);
       }
@@ -96,7 +100,7 @@ public class FailoverBackupLeakTest {
 
          @Override
          public boolean isSatisified() throws Exception {
-            return getConnectionCount(s1) == 0;
+            return getConnectionCount(s1) == initCount1;
          }
       }));
 
@@ -104,16 +108,22 @@ public class FailoverBackupLeakTest {
 
          @Override
          public boolean isSatisified() throws Exception {
-            return getConnectionCount(s2) == 0;
+            return getConnectionCount(s2) == initCount2;
          }
       }));
    }
 
-   private int getConnectionCount(BrokerService service) {
-      return service.getTransportConnectors().get(0).getConnections().size();
+   private int getConnectionCount(EmbeddedJMS server) throws Exception {
+      ManagementService managementService = server.getActiveMQServer().getManagementService();
+      JMSServerControl jmsControl = (JMSServerControl) managementService.getResource("jms.server");
+      String[] ids = jmsControl.listConnectionIDs();
+      if (ids != null) {
+         return ids.length;
+      }
+      return 0;
    }
 
-   private void buildConnection(ConnectionFactory local) throws JMSException {
+   private void buildConnection(ConnectionFactory local) throws Exception {
       Connection conn = null;
       Session sess = null;
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
index c0c529d..bf43caa 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
@@ -16,146 +16,127 @@
  */
 package org.apache.activemq.transport.failover;
 
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
-public class FailoverClusterTest extends TestCase {
+public class FailoverClusterTest extends OpenwireArtemisBaseTest {
 
    private static final int NUMBER = 10;
-   private static final String BROKER_BIND_ADDRESS = "tcp://0.0.0.0:0";
-   private static final String BROKER_A_NAME = "BROKERA";
-   private static final String BROKER_B_NAME = "BROKERB";
-   private BrokerService brokerA;
-   private BrokerService brokerB;
    private String clientUrl;
 
    private final List<ActiveMQConnection> connections = new ArrayList<>();
+   EmbeddedJMS server1;
+   EmbeddedJMS server2;
+
+
+   @Before
+   public void setUp() throws Exception {
+      Configuration config1 = createConfig(1);
+      Configuration config2 = createConfig(2);
+
+      deployClusterConfiguration(config1, 2);
+      deployClusterConfiguration(config2, 1);
+
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
 
+      clientUrl = "failover://(" + newURI(1) + "," + newURI(2) + ")";
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      for (Connection c : connections) {
+         c.close();
+      }
+      server1.stop();
+      server2.stop();
+   }
+
+   @Test
    public void testClusterConnectedAfterClients() throws Exception {
+      server1.start();
       createClients();
-      if (brokerB == null) {
-         brokerB = createBrokerB(BROKER_BIND_ADDRESS);
-      }
-      Thread.sleep(3000);
       Set<String> set = new HashSet<>();
+      server2.start();
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
+      Thread.sleep(3000);
+
       for (ActiveMQConnection c : connections) {
+         System.out.println("======> adding address: " + c.getTransportChannel().getRemoteAddress());
          set.add(c.getTransportChannel().getRemoteAddress());
       }
-      assertTrue(set.size() > 1);
+      System.out.println("============final size: " + set.size());
+      Assert.assertTrue(set.size() > 1);
    }
 
+   //this test seems the same as the above one as long as artemis broker
+   //is concerned.
+   @Test
    public void testClusterURIOptionsStrip() throws Exception {
+      server1.start();
+
       createClients();
-      if (brokerB == null) {
-         // add in server side only url param, should not be propagated
-         brokerB = createBrokerB(BROKER_BIND_ADDRESS + "?transport.closeAsync=false");
-      }
+      server2.start();
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
       Thread.sleep(3000);
-      Set<String> set = new HashSet<>();
+
+      Set<String> set = new HashSet<String>();
       for (ActiveMQConnection c : connections) {
          set.add(c.getTransportChannel().getRemoteAddress());
       }
-      assertTrue(set.size() > 1);
+      Assert.assertTrue(set.size() > 1);
    }
 
+   @Test
    public void testClusterConnectedBeforeClients() throws Exception {
 
-      if (brokerB == null) {
-         brokerB = createBrokerB(BROKER_BIND_ADDRESS);
-      }
-      Thread.sleep(5000);
+      server1.start();
+      server2.start();
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
       createClients();
-      Thread.sleep(2000);
-      brokerA.stop();
-      Thread.sleep(2000);
+      server1.stop();
+      Thread.sleep(1000);
 
-      URI brokerBURI = new URI(brokerB.getTransportConnectors().get(0).getPublishableConnectString());
+      URI brokerBURI = new URI(newURI(2));
       for (ActiveMQConnection c : connections) {
          String addr = c.getTransportChannel().getRemoteAddress();
-         assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
+         Assert.assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
       }
    }
 
-   @Override
-   protected void setUp() throws Exception {
-      if (brokerA == null) {
-         brokerA = createBrokerA(BROKER_BIND_ADDRESS + "?transport.closeAsync=false");
-         clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")";
-      }
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      for (Connection c : connections) {
-         c.close();
-      }
-      if (brokerB != null) {
-         brokerB.stop();
-         brokerB = null;
-      }
-      if (brokerA != null) {
-         brokerA.stop();
-         brokerA = null;
-      }
-   }
-
-   protected BrokerService createBrokerA(String uri) throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setUseJmx(false);
-      configureConsumerBroker(answer, uri);
-      answer.start();
-      return answer;
-   }
-
-   protected void configureConsumerBroker(BrokerService answer, String uri) throws Exception {
-      answer.setBrokerName(BROKER_A_NAME);
-      answer.setPersistent(false);
-      TransportConnector connector = answer.addConnector(uri);
-      connector.setRebalanceClusterClients(true);
-      connector.setUpdateClusterClients(true);
-      answer.setUseShutdownHook(false);
-   }
-
-   protected BrokerService createBrokerB(String uri) throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setUseJmx(false);
-      configureNetwork(answer, uri);
-      answer.start();
-      return answer;
-   }
-
-   protected void configureNetwork(BrokerService answer, String uri) throws Exception {
-      answer.setBrokerName(BROKER_B_NAME);
-      answer.setPersistent(false);
-      NetworkConnector network = answer.addNetworkConnector("static://" + brokerA.getTransportConnectors().get(0).getPublishableConnectString());
-      network.setDuplex(true);
-      TransportConnector connector = answer.addConnector(uri);
-      connector.setRebalanceClusterClients(true);
-      connector.setUpdateClusterClients(true);
-      answer.setUseShutdownHook(false);
-   }
-
-   @SuppressWarnings("unused")
    protected void createClients() throws Exception {
       ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
       for (int i = 0; i < NUMBER; i++) {
+         System.out.println("*****create connection using url: " + clientUrl);
          ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+         System.out.println("got connection, starting it ...");
          c.start();
+         System.out.println("******Started");
          Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue queue = s.createQueue(getClass().getName());
          MessageConsumer consumer = s.createConsumer(queue);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
index 53f0689..1d902e3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
@@ -16,7 +16,29 @@
  */
 package org.apache.activemq.transport.failover;
 
-import org.apache.activemq.broker.TransportConnector;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * Complex cluster test that will exercise the dynamic failover capabilities of
@@ -25,36 +47,71 @@ import org.apache.activemq.broker.TransportConnector;
  * connections on the client should start with 3, then have two after the 3rd
  * broker is removed and then show 3 after the 3rd broker is reintroduced.
  */
-public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
+public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
 
    private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
    private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
-   private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
-   private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
-   private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
-   private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://127.0.0.1:61628";
-   private static final String BROKER_A_NAME = "BROKERA";
-   private static final String BROKER_B_NAME = "BROKERB";
-   private static final String BROKER_C_NAME = "BROKERC";
+
+   private String clientUrl;
+   private EmbeddedJMS[] servers = new EmbeddedJMS[3];
+
+   private static final int NUMBER_OF_CLIENTS = 30;
+   private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+
+
+   @Before
+   public void setUp() throws Exception {
+   }
+
+   //default setup for most tests
+   private void commonSetup() throws Exception {
+      Configuration config0 = createConfig(0);
+      Configuration config1 = createConfig(1);
+      Configuration config2 = createConfig(2);
+
+      deployClusterConfiguration(config0, 1, 2);
+      deployClusterConfiguration(config1, 0, 2);
+      deployClusterConfiguration(config2, 0, 1);
+
+      servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[2] = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
+
+      servers[0].start();
+      servers[1].start();
+      servers[2].start();
+
+      Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+      Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+      Assert.assertTrue(servers[2].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      shutdownClients();
+      for (EmbeddedJMS server : servers) {
+         if (server != null) {
+            server.stop();
+         }
+      }
+   }
 
    /**
     * Basic dynamic failover 3 broker test
     *
     * @throws Exception
     */
+   @Test
    public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
-
-      initSingleTcBroker("", null, null);
-
-      Thread.sleep(2000);
-
+      commonSetup();
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
       createClients();
-      Thread.sleep(2000);
 
+      Thread.sleep(3000);
       runTests(false, null, null, null);
    }
 
+
    /**
     * Tests a 3 broker configuration to ensure that the backup is random and
     * supported in a cluster. useExponentialBackOff is set to false and
@@ -63,10 +120,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
     *
     * @throws Exception
     */
+   @Test
    public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception {
-
-      initSingleTcBroker("", null, null);
-
+      commonSetup();
       Thread.sleep(2000);
 
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2&useExponentialBackOff=false&initialReconnectDelay=500");
@@ -84,10 +140,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
     *
     * @throws Exception
     */
+   @Test
    public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
-
-      initSingleTcBroker("?transport.closeAsync=false", null, null);
-
+      commonSetup();
       Thread.sleep(2000);
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
       createClients();
@@ -101,10 +156,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
     *
     * @throws Exception
     */
+   @Test
    public void testThreeBrokerClusterWithClusterFilter() throws Exception {
-
-      initSingleTcBroker("?transport.closeAsync=false", null, null);
-
+      commonSetup();
       Thread.sleep(2000);
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
       createClients();
@@ -118,10 +172,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
     *
     * @throws Exception
     */
+   @Test
    public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
-
-      initMultiTcCluster("", null);
-
+      commonSetup();
       Thread.sleep(2000);
 
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
@@ -136,9 +189,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
     *
     * @throws Exception
     */
+   @Test
    public void testOriginalBrokerRestart() throws Exception {
-      initSingleTcBroker("", null, null);
-
+      commonSetup();
       Thread.sleep(2000);
 
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
@@ -147,16 +200,13 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
 
       assertClientsConnectedToThreeBrokers();
 
-      getBroker(BROKER_A_NAME).stop();
-      getBroker(BROKER_A_NAME).waitUntilStopped();
-      removeBroker(BROKER_A_NAME);
+      stopServer(0);
 
       Thread.sleep(5000);
 
       assertClientsConnectedToTwoBrokers();
 
-      createBrokerA(false, null, null, null);
-      getBroker(BROKER_A_NAME).waitUntilStarted();
+      restartServer(0);
       Thread.sleep(5000);
 
       assertClientsConnectedToThreeBrokers();
@@ -168,10 +218,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
     *
     * @throws Exception
     */
+   @Test
    public void testThreeBrokerClusterClientDistributions() throws Exception {
-
-      initSingleTcBroker("", null, null);
-
+      commonSetup();
       Thread.sleep(2000);
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false&initialReconnectDelay=500");
       createClients(100);
@@ -186,10 +235,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
     *
     * @throws Exception
     */
+   @Test
    public void testThreeBrokerClusterDestinationFilter() throws Exception {
-
-      initSingleTcBroker("", null, null);
-
+      commonSetup();
       Thread.sleep(2000);
       setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
       createClients();
@@ -197,28 +245,25 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
       runTests(false, null, null, "Queue.TEST.FOO.>");
    }
 
+   @Test
    public void testFailOverWithUpdateClientsOnRemove() throws Exception {
       // Broker A
-      addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-      TransportConnector connectorA = getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS);
-      connectorA.setName("openwire");
-      connectorA.setRebalanceClusterClients(true);
-      connectorA.setUpdateClusterClients(true);
-      connectorA.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
-      addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      getBroker(BROKER_A_NAME).start();
-
+      Configuration config0 = createConfig(0, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true");
       // Broker B
-      addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-      TransportConnector connectorB = getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS);
-      connectorB.setName("openwire");
-      connectorB.setRebalanceClusterClients(true);
-      connectorB.setUpdateClusterClients(true);
-      connectorB.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
-      addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-      getBroker(BROKER_B_NAME).start();
-
-      getBroker(BROKER_B_NAME).waitUntilStarted();
+      Configuration config1 = createConfig(1, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true");
+
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
+
+      servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[0].start();
+
+      servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      servers[1].start();
+
+      servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2);
+      servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2);
+
       Thread.sleep(1000);
 
       // create client connecting only to A. It should receive broker B address whet it connects to A.
@@ -227,9 +272,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
       Thread.sleep(5000);
 
       // We stop broker A.
-      logger.info("Stopping broker A whose address is: {}", BROKER_A_CLIENT_TC_ADDRESS);
-      getBroker(BROKER_A_NAME).stop();
-      getBroker(BROKER_A_NAME).waitUntilStopped();
+      servers[0].stop();
+      servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1);
+
       Thread.sleep(5000);
 
       // Client should failover to B.
@@ -258,138 +303,150 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
                          String destinationFilter) throws Exception, InterruptedException {
       assertClientsConnectedToThreeBrokers();
 
-      getBroker(BROKER_C_NAME).stop();
-      getBroker(BROKER_C_NAME).waitUntilStopped();
-      removeBroker(BROKER_C_NAME);
+      stopServer(2);
 
       Thread.sleep(5000);
 
       assertClientsConnectedToTwoBrokers();
 
-      createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
-      getBroker(BROKER_C_NAME).waitUntilStarted();
+      restartServer(2);
+
       Thread.sleep(5000);
 
       assertClientsConnectedToThreeBrokers();
    }
 
-   /**
-    * @param multi
-    * @param tcParams
-    * @param clusterFilter
-    * @param destinationFilter
-    * @throws Exception
-    * @throws InterruptedException
-    */
+   public void setClientUrl(String clientUrl) {
+      this.clientUrl = clientUrl;
+   }
+
+   protected void createClients() throws Exception {
+      createClients(NUMBER_OF_CLIENTS);
+   }
+
+   protected void createClients(int numOfClients) throws Exception {
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+      for (int i = 0; i < numOfClients; i++) {
+         ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+         c.start();
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = s.createQueue(getClass().getName());
+         MessageConsumer consumer = s.createConsumer(queue);
+         connections.add(c);
+      }
+   }
+
+   protected void shutdownClients() throws JMSException {
+      for (Connection c : connections) {
+         c.close();
+      }
+   }
+
+   protected void assertClientsConnectedToThreeBrokers() {
+      Set<String> set = new HashSet<String>();
+      for (ActiveMQConnection c : connections) {
+         if (c.getTransportChannel().getRemoteAddress() != null) {
+            set.add(c.getTransportChannel().getRemoteAddress());
+         }
+      }
+      Assert.assertTrue("Only 3 connections should be found: " + set, set.size() == 3);
+   }
+
+   protected void assertClientsConnectedToTwoBrokers() {
+      Set<String> set = new HashSet<String>();
+      for (ActiveMQConnection c : connections) {
+         if (c.getTransportChannel().getRemoteAddress() != null) {
+            set.add(c.getTransportChannel().getRemoteAddress());
+         }
+      }
+      Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2);
+   }
+
+   private void stopServer(int serverID) throws Exception {
+      servers[serverID].stop();
+      for (int i = 0; i < servers.length; i++) {
+         if (i != serverID) {
+            Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length - 1));
+         }
+      }
+   }
+
+   private void restartServer(int serverID) throws Exception {
+      servers[serverID].start();
+
+      for (int i = 0; i < servers.length; i++) {
+         Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length));
+      }
+   }
+
    private void runClientDistributionTests(boolean multi,
                                            String tcParams,
                                            String clusterFilter,
                                            String destinationFilter) throws Exception, InterruptedException {
       assertClientsConnectedToThreeBrokers();
-      assertClientsConnectionsEvenlyDistributed(.25);
+      //if 2/3 or more of total connections connect to one node, we consider it wrong
+      //if 1/4 or less of total connects to one node, we consider it wrong
+      assertClientsConnectionsEvenlyDistributed(.25, .67);
 
-      getBroker(BROKER_C_NAME).stop();
-      getBroker(BROKER_C_NAME).waitUntilStopped();
-      removeBroker(BROKER_C_NAME);
+      stopServer(2);
 
       Thread.sleep(5000);
 
       assertClientsConnectedToTwoBrokers();
-      assertClientsConnectionsEvenlyDistributed(.35);
+      //now there are only 2 nodes
+      //if 2/3 or more of total connections go to either node, we consider it wrong
+      //if 1/3 or less of total connections go to either node, we consider it wrong
+      assertClientsConnectionsEvenlyDistributed(.34, .67);
 
-      createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
-      getBroker(BROKER_C_NAME).waitUntilStarted();
+      restartServer(2);
       Thread.sleep(5000);
 
       assertClientsConnectedToThreeBrokers();
-      assertClientsConnectionsEvenlyDistributed(.20);
-   }
-
-   @Override
-   protected void setUp() throws Exception {
+      //now back to 3 nodes. We assume at least the new node will
+      //have 1/10 of the total connections, and any node's connections
+      //won't exceed 50%
+      assertClientsConnectionsEvenlyDistributed(.10, .50);
    }
 
-   @Override
-   protected void tearDown() throws Exception {
-      shutdownClients();
-      Thread.sleep(2000);
-      destroyBrokerCluster();
-   }
-
-   private void initSingleTcBroker(String params, String clusterFilter, String destinationFilter) throws Exception {
-      createBrokerA(false, params, clusterFilter, null);
-      createBrokerB(false, params, clusterFilter, null);
-      createBrokerC(false, params, clusterFilter, null);
-      getBroker(BROKER_C_NAME).waitUntilStarted();
-   }
-
-   private void initMultiTcCluster(String params, String clusterFilter) throws Exception {
-      createBrokerA(true, params, clusterFilter, null);
-      createBrokerB(true, params, clusterFilter, null);
-      createBrokerC(true, params, clusterFilter, null);
-      getBroker(BROKER_C_NAME).waitUntilStarted();
-   }
-
-   private void createBrokerA(boolean multi,
-                              String params,
-                              String clusterFilter,
-                              String destinationFilter) throws Exception {
-      final String tcParams = (params == null) ? "" : params;
-      if (getBroker(BROKER_A_NAME) == null) {
-         addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-         addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
-         if (multi) {
-            addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false);
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-         }
-         else {
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+   protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage, double maximumPercentage) {
+      Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
+      int total = 0;
+      for (ActiveMQConnection c : connections) {
+         String key = c.getTransportChannel().getRemoteAddress();
+         if (key != null) {
+            total++;
+            if (clientConnectionCounts.containsKey(key)) {
+               double count = clientConnectionCounts.get(key);
+               count += 1.0;
+               clientConnectionCounts.put(key, count);
+            }
+            else {
+               clientConnectionCounts.put(key, 1.0);
+            }
          }
-         getBroker(BROKER_A_NAME).start();
       }
-   }
-
-   private void createBrokerB(boolean multi,
-                              String params,
-                              String clusterFilter,
-                              String destinationFilter) throws Exception {
-      final String tcParams = (params == null) ? "" : params;
-      if (getBroker(BROKER_B_NAME) == null) {
-         addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-         addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
-         if (multi) {
-            addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false);
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+      Set<String> keys = clientConnectionCounts.keySet();
+      List<String> errorMsgs = new ArrayList<String>();
+      for (String key : keys) {
+         double count = clientConnectionCounts.get(key);
+         double percentage = count / total;
+         if (percentage < minimumPercentage || percentage > maximumPercentage) {
+            errorMsgs.add("Connections distribution expected to be within range [ " + minimumPercentage
+                    + ", " + maximumPercentage + "].  Actuall distribution was " + percentage + " for connection " + key);
          }
-         else {
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+         if (errorMsgs.size() > 0) {
+            for (String err : errorMsgs) {
+               System.err.println(err);
+            }
+            Assert.fail("Test failed. Please see the log message for details");
          }
-         getBroker(BROKER_B_NAME).start();
       }
    }
 
-   private void createBrokerC(boolean multi,
-                              String params,
-                              String clusterFilter,
-                              String destinationFilter) throws Exception {
-      final String tcParams = (params == null) ? "" : params;
-      if (getBroker(BROKER_C_NAME) == null) {
-         addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
-         addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
-         if (multi) {
-            addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS + tcParams, false);
-            addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-            addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-         }
-         else {
-            addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-            addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
-         }
-         getBroker(BROKER_C_NAME).start();
+   protected void assertAllConnectedTo(String url) throws Exception {
+      for (ActiveMQConnection c : connections) {
+         Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
       }
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index e33e7ea..78a8a0b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -39,63 +39,61 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.After;
 import org.junit.Test;
 
-public class FailoverConsumerOutstandingCommitTest {
-
+@RunWith(BMUnitRunner.class)
+public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class);
    private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
    private static final String MESSAGE_TEXT = "Test message ";
-   private static final String TRANSPORT_URI = "tcp://localhost:0";
-   private String url;
+   private static final String url = newURI(0);
    final int prefetch = 10;
-   BrokerService broker;
+   private static EmbeddedJMS server;
+   private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+   private static CountDownLatch brokerStopLatch = new CountDownLatch(1);
 
    @After
    public void stopBroker() throws Exception {
-      if (broker != null) {
-         broker.stop();
+      if (server != null) {
+         server.stop();
       }
    }
 
-   public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      broker = createBroker(deleteAllMessagesOnStartup);
-      broker.start();
-   }
-
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-      return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
-   }
-
-   public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
-      broker = new BrokerService();
-      broker.addConnector(bindAddress);
-      broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry defaultEntry = new PolicyEntry();
-
-      // optimizedDispatche and sync dispatch ensure that the dispatch happens
-      // before the commit reply that the consumer.clearDispatchList is waiting for.
-      defaultEntry.setOptimizedDispatch(true);
-      policyMap.setDefaultEntry(defaultEntry);
-      broker.setDestinationPolicy(policyMap);
-
-      url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
-      return broker;
+   public void startServer() throws Exception {
+      server = createBroker();
+      server.start();
    }
 
    @Test
+   @BMRules(
+      rules = {
+              @BMRule(
+                      name = "set no return response",
+                      targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                      targetMethod = "processCommitTransactionOnePhase",
+                      targetLocation = "ENTRY",
+                      binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                      action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse(context)"),
+              @BMRule(
+                      name = "stop broker before commit",
+                      targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+                      targetMethod = "commit",
+                      targetLocation = "ENTRY",
+                      action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()"),
+      }
+   )
    public void testFailoverConsumerDups() throws Exception {
       doTestFailoverConsumerDups(true);
    }
@@ -103,30 +101,9 @@ public class FailoverConsumerOutstandingCommitTest {
    @SuppressWarnings("unchecked")
    public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
 
-      broker = createBroker(true);
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         @Override
-         public void commitTransaction(ConnectionContext context,
-                                       TransactionId xid,
-                                       boolean onePhase) throws Exception {
-            // so commit will hang as if reply is lost
-            context.setDontSendReponse(true);
-            Executors.newSingleThreadExecutor().execute(new Runnable() {
-               @Override
-               public void run() {
-                  LOG.info("Stopping broker before commit...");
-                  try {
-                     broker.stop();
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-         }
-      }});
-      broker.start();
+      server = createBroker();
+      server.start();
+      brokerStopLatch = new CountDownLatch(1);
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       cf.setWatchTopicAdvisories(watchTopicAdvisories);
@@ -144,9 +121,9 @@ public class FailoverConsumerOutstandingCommitTest {
       final CountDownLatch messagesReceived = new CountDownLatch(2);
 
       final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
+      doByteman.set(true);
       testConsumer.setMessageListener(new MessageListener() {
 
-         @Override
          public void onMessage(Message message) {
             LOG.info("consume one and commit");
 
@@ -166,7 +143,6 @@ public class FailoverConsumerOutstandingCommitTest {
 
       // may block if broker shutodwn happens quickly
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("producer started");
             try {
@@ -183,9 +159,11 @@ public class FailoverConsumerOutstandingCommitTest {
       });
 
       // will be stopped by the plugin
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      broker.start();
+      brokerStopLatch.await();
+      server.stop();
+      server = createBroker();
+      doByteman.set(false);
+      server.start();
 
       assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
       assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS));
@@ -194,11 +172,41 @@ public class FailoverConsumerOutstandingCommitTest {
    }
 
    @Test
+   @BMRules(
+      rules = {
+              @BMRule(
+                      name = "set no return response",
+                      targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                      targetMethod = "processCommitTransactionOnePhase",
+                      targetLocation = "ENTRY",
+                      binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                      action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse(context)"),
+              @BMRule(
+                      name = "stop broker before commit",
+                      targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+                      targetMethod = "commit",
+                      targetLocation = "ENTRY",
+                      action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
    public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
       doTestFailoverConsumerOutstandingSendTx(false);
    }
 
    @Test
+   @BMRules(
+      rules = {
+              @BMRule(
+                      name = "set no return response",
+                      targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+                      targetMethod = "processCommitTransactionOnePhase",
+                      targetLocation = "ENTRY",
+                      binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+                      action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse(context)"),
+              @BMRule(
+                      name = "stop broker after commit",
+                      targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+                      targetMethod = "commit",
+                      targetLocation = "AT EXIT",
+                      action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()")})
    public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
       doTestFailoverConsumerOutstandingSendTx(true);
    }
@@ -206,36 +214,9 @@ public class FailoverConsumerOutstandingCommitTest {
    @SuppressWarnings("unchecked")
    public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception {
       final boolean watchTopicAdvisories = true;
-      broker = createBroker(true);
-
-      broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-         @Override
-         public void commitTransaction(ConnectionContext context,
-                                       TransactionId xid,
-                                       boolean onePhase) throws Exception {
-            // from the consumer perspective whether the commit completed on the broker or
-            // not is irrelevant, the transaction is still in doubt in the absence of a reply
-            if (doActualBrokerCommit) {
-               LOG.info("doing actual broker commit...");
-               super.commitTransaction(context, xid, onePhase);
-            }
-            // so commit will hang as if reply is lost
-            context.setDontSendReponse(true);
-            Executors.newSingleThreadExecutor().execute(new Runnable() {
-               @Override
-               public void run() {
-                  LOG.info("Stopping broker before commit...");
-                  try {
-                     broker.stop();
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-         }
-      }});
-      broker.start();
+      server = createBroker();
+      server.start();
+      brokerStopLatch = new CountDownLatch(1);
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       cf.setWatchTopicAdvisories(watchTopicAdvisories);
@@ -254,11 +235,11 @@ public class FailoverConsumerOutstandingCommitTest {
       final CountDownLatch commitDoneLatch = new CountDownLatch(1);
       final CountDownLatch messagesReceived = new CountDownLatch(3);
       final AtomicBoolean gotCommitException = new AtomicBoolean(false);
-      final ArrayList<TextMessage> receivedMessages = new ArrayList<>();
+      final ArrayList<TextMessage> receivedMessages = new ArrayList<TextMessage>();
       final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
+      doByteman.set(true);
       testConsumer.setMessageListener(new MessageListener() {
 
-         @Override
          public void onMessage(Message message) {
             LOG.info("consume one and commit: " + message);
             assertNotNull("got message", message);
@@ -279,7 +260,6 @@ public class FailoverConsumerOutstandingCommitTest {
 
       // may block if broker shutdown happens quickly
       Executors.newSingleThreadExecutor().execute(new Runnable() {
-         @Override
          public void run() {
             LOG.info("producer started");
             try {
@@ -296,9 +276,11 @@ public class FailoverConsumerOutstandingCommitTest {
       });
 
       // will be stopped by the plugin
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      broker.start();
+      brokerStopLatch.await();
+      server.stop();
+      doByteman.set(false);
+      server = createBroker();
+      server.start();
 
       assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
       assertTrue("commit failed", gotCommitException.get());
@@ -313,12 +295,13 @@ public class FailoverConsumerOutstandingCommitTest {
       assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(receivedIndex++).getText());
 
       connection.close();
+      server.stop();
    }
 
    @Test
    public void testRollbackFailoverConsumerTx() throws Exception {
-      broker = createBroker(true);
-      broker.start();
+      server = createBroker();
+      server.start();
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
       cf.setConsumerFailoverRedeliveryWaitPeriod(10000);
@@ -340,10 +323,9 @@ public class FailoverConsumerOutstandingCommitTest {
       assertNotNull(msg);
 
       // restart with outstanding delivered message
-      broker.stop();
-      broker.waitUntilStopped();
-      broker = createBroker(false, url);
-      broker.start();
+      server.stop();
+      server = createBroker();
+      server.start();
 
       consumerSession.rollback();
 
@@ -379,4 +361,29 @@ public class FailoverConsumerOutstandingCommitTest {
       }
       producer.close();
    }
+
+   public static void holdResponse(AMQConnectionContext context) {
+      if (doByteman.get()) {
+         context.setDontSendReponse(true);
+      }
+   }
+
+   public static void stopServerInTransaction() {
+      if (doByteman.get()) {
+         Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+               LOG.info("Stopping broker in transaction...");
+               try {
+                  server.stop();
+               }
+               catch (Exception e) {
+                  e.printStackTrace();
+               }
+               finally {
+                  brokerStopLatch.countDown();
+               }
+            }
+         });
+      }
+   }
 }


Mime
View raw message