activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [06/39] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Wed, 17 Feb 2016 02:03:41 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index 5016e30..dc91873 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -16,22 +16,41 @@
  */
 package org.apache.activemq.transport.failover;
 
-public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
-   private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
-   private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
-   private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
-   private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
-   private static final String BROKER_A_NAME = "BROKERA";
-   private static final String BROKER_B_NAME = "BROKERB";
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
+
+   private static final int NUMBER_OF_CLIENTS = 30;
+   private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+   private EmbeddedJMS server0;
+   private EmbeddedJMS server1;
+   private String clientUrl;
+
+   @Test
    public void testTwoBrokersRestart() throws Exception {
-      createBrokerA(false, "", null, null);
-      createBrokerB(false, "", null, null);
-      getBroker(BROKER_B_NAME).waitUntilStarted();
 
       Thread.sleep(2000);
-      setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
       createClients();
 
       Thread.sleep(5000);
@@ -39,59 +58,106 @@ public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
       assertClientsConnectedToTwoBrokers();
       assertClientsConnectionsEvenlyDistributed(.35);
 
-      getBroker(BROKER_A_NAME).stop();
-      getBroker(BROKER_A_NAME).waitUntilStopped();
-      removeBroker(BROKER_A_NAME);
+      server0.stop();
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
 
       Thread.sleep(1000);
 
-      assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+      assertAllConnectedTo(newURI("127.0.0.1", 1));
 
       Thread.sleep(5000);
 
-      createBrokerA(false, "", null, null);
-      getBroker(BROKER_A_NAME).waitUntilStarted();
+      server0.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
       Thread.sleep(5000);
 
+      //need update-cluster-clients, -on-remove and rebalance set to true.
       assertClientsConnectedToTwoBrokers();
       assertClientsConnectionsEvenlyDistributed(.35);
    }
 
-   private void createBrokerA(boolean multi,
-                              String params,
-                              String clusterFilter,
-                              String destinationFilter) throws Exception {
-      final String tcParams = (params == null) ? "" : params;
-      if (getBroker(BROKER_A_NAME) == null) {
-         addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-         addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
-         if (multi) {
-            addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false);
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-         }
-         else {
-            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
-         }
-         getBroker(BROKER_A_NAME).start();
+
+   @Before
+   public void setUp() throws Exception {
+      Configuration config0 = createConfig("127.0.0.1", 0);
+      Configuration config1 = createConfig("127.0.0.1", 1);
+
+      deployClusterConfiguration(config0, 1);
+      deployClusterConfiguration(config1, 0);
+
+      server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+
+      clientUrl = "failover://(" + newURI("127.0.0.1", 0) + "," + newURI("127.0.0.1", 1) + ")";
+
+      server0.start();
+      server1.start();
+      Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+      Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      server0.stop();
+      server1.stop();
+   }
+
+   protected void createClients() throws Exception {
+      createClients(NUMBER_OF_CLIENTS);
+   }
+
+   protected void createClients(int numOfClients) throws Exception {
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+      for (int i = 0; i < numOfClients; i++) {
+         ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+         c.start();
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = s.createQueue(getClass().getName());
+         MessageConsumer consumer = s.createConsumer(queue);
+         connections.add(c);
       }
    }
 
-   private void createBrokerB(boolean multi,
-                              String params,
-                              String clusterFilter,
-                              String destinationFilter) throws Exception {
-      final String tcParams = (params == null) ? "" : params;
-      if (getBroker(BROKER_B_NAME) == null) {
-         addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-         addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
-         if (multi) {
-            addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false);
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+   protected void assertClientsConnectedToTwoBrokers() {
+      Set<String> set = new HashSet<String>();
+      for (ActiveMQConnection c : connections) {
+         if (c.getTransportChannel().getRemoteAddress() != null) {
+            set.add(c.getTransportChannel().getRemoteAddress());
          }
-         else {
-            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+      }
+      Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2);
+   }
+
+   protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) {
+      Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
+      int total = 0;
+      for (ActiveMQConnection c : connections) {
+         String key = c.getTransportChannel().getRemoteAddress();
+         if (key != null) {
+            total++;
+            if (clientConnectionCounts.containsKey(key)) {
+               double count = clientConnectionCounts.get(key);
+               count += 1.0;
+               clientConnectionCounts.put(key, count);
+            }
+            else {
+               clientConnectionCounts.put(key, 1.0);
+            }
          }
-         getBroker(BROKER_B_NAME).start();
+      }
+      Set<String> keys = clientConnectionCounts.keySet();
+      for (String key : keys) {
+         double count = clientConnectionCounts.get(key);
+         double percentage = count / total;
+         System.out.println(count + " of " + total + " connections for " + key + " = " + percentage);
+         Assert.assertTrue("Connections distribution expected to be >= than " + minimumPercentage + ".  Actuall distribution was " + percentage + " for connection " + key, percentage >= minimumPercentage);
+      }
+   }
+
+   protected void assertAllConnectedTo(String url) throws Exception {
+      for (ActiveMQConnection c : connections) {
+         Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
index dc369be..a372b79 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
@@ -26,45 +26,42 @@ import javax.jms.Session;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
-public class FanoutTest extends TestCase {
+public class FanoutTest extends OpenwireArtemisBaseTest {
 
-   BrokerService broker1;
-   BrokerService broker2;
+   EmbeddedJMS[] servers = new EmbeddedJMS[2];
 
    ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("fanout:(static:(tcp://localhost:61616,tcp://localhost:61617))?fanOutQueues=true");
    Connection producerConnection;
    Session producerSession;
    int messageCount = 100;
 
-   @Override
+   @Before
    public void setUp() throws Exception {
-      broker1 = BrokerFactory.createBroker("broker:(tcp://localhost:61616)/brokerA?persistent=false&useJmx=false");
-      broker2 = BrokerFactory.createBroker("broker:(tcp://localhost:61617)/brokerB?persistent=false&useJmx=false");
-
-      broker1.start();
-      broker2.start();
-
-      broker1.waitUntilStarted();
-      broker2.waitUntilStarted();
+      setUpNonClusterServers(servers);
 
       producerConnection = producerFactory.createConnection();
       producerConnection.start();
       producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }
 
-   @Override
+   @After
    public void tearDown() throws Exception {
       producerSession.close();
       producerConnection.close();
 
-      broker1.stop();
-      broker2.stop();
+      shutDownNonClusterServers(servers);
    }
 
+   @Test
    public void testSendReceive() throws Exception {
 
       MessageProducer prod = createProducer();
@@ -76,7 +73,6 @@ public class FanoutTest extends TestCase {
 
       assertMessagesReceived("tcp://localhost:61616");
       assertMessagesReceived("tcp://localhost:61617");
-
    }
 
    protected MessageProducer createProducer() throws Exception {
@@ -95,7 +91,7 @@ public class FanoutTest extends TestCase {
       listener.assertMessagesReceived(messageCount);
 
       consumer.close();
-      consumerConnection.close();
       consumerSession.close();
+      consumerConnection.close();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
index 7e52f13..2cfc136 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
@@ -18,51 +18,111 @@ package org.apache.activemq.transport.fanout;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
 
-import junit.framework.Test;
-
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.network.NetworkTestSupport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.mock.MockTransport;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FanoutTransportBrokerTest extends NetworkTestSupport {
+@RunWith(Parameterized.class)
+public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest {
+   public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+   protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+   protected long idGenerator;
+   protected int msgIdGenerator;
+   protected int maxWait = 10000;
 
    private static final Logger LOG = LoggerFactory.getLogger(FanoutTransportBrokerTest.class);
 
-   public ActiveMQDestination destination;
-   public int deliveryMode;
+   private EmbeddedJMS server;
+   private EmbeddedJMS remoteServer;
+
+   private ActiveMQDestination destination;
+   private int deliveryMode;
 
-   public static Test suite() {
-      return suite(FanoutTransportBrokerTest.class);
+   @Parameterized.Parameters(name="test-{index}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][]{
+              {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")},
+              {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")},
+              {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")},
+              {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")}
+      });
    }
 
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
+   public FanoutTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) {
+      this.deliveryMode = deliveryMode;
+      this.destination = destination;
    }
 
-   public void initCombosForTestPublisherFansout() {
-      addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-      addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST"), new ActiveMQQueue("TEST")});
+   @Before
+   public void setUp() throws Exception {
+      Configuration config0 = createConfig(0);
+      server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      Configuration config1 = createConfig(1);
+      remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+      server.start();
+      remoteServer.start();
+
+   }
+   @After
+   public void tearDown() throws Exception {
+      for (StubConnection conn : connections) {
+         try {
+            conn.stop();
+         }
+         catch (Exception e) {
+         }
+      }
+      try {
+         remoteServer.stop();
+      }
+      catch (Exception e) {
+      }
+      try {
+         server.stop();
+      }
+      catch (Exception e) {
+      }
    }
 
+   @Test
    public void testPublisherFansout() throws Exception {
-
       // Start a normal consumer on the local broker
       StubConnection connection1 = createConnection();
       ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -94,21 +154,28 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       // Send the message using the fail over publisher.
       connection3.request(createMessage(producerInfo3, destination, deliveryMode));
 
-      assertNotNull(receiveMessage(connection1));
+      Assert.assertNotNull(receiveMessage(connection1));
       assertNoMessagesLeft(connection1);
 
-      assertNotNull(receiveMessage(connection2));
+      Assert.assertNotNull(receiveMessage(connection2));
       assertNoMessagesLeft(connection2);
 
    }
 
+   /*
    public void initCombosForTestPublisherWaitsForServerToBeUp() {
       addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
       addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST")});
    }
+*/
 
+   @Test
    public void testPublisherWaitsForServerToBeUp() throws Exception {
 
+      if (name.getMethodName().contains("test-0") || name.getMethodName().contains("test-2")) {
+         System.out.println("Discarding invalid test: " + name.getMethodName());
+         return;
+      }
       // Start a normal consumer on the local broker
       StubConnection connection1 = createConnection();
       ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -140,19 +207,18 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       // Send the message using the fail over publisher.
       connection3.request(createMessage(producerInfo3, destination, deliveryMode));
 
-      assertNotNull(receiveMessage(connection1));
+      Assert.assertNotNull(receiveMessage(connection1));
       assertNoMessagesLeft(connection1);
 
-      assertNotNull(receiveMessage(connection2));
+      Assert.assertNotNull(receiveMessage(connection2));
       assertNoMessagesLeft(connection2);
 
       final CountDownLatch publishDone = new CountDownLatch(1);
 
       // The MockTransport is on the remote connection.
       // Slip in a new transport filter after the MockTransport
-      MockTransport mt = connection3.getTransport().narrow(MockTransport.class);
+      MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
       mt.install(new TransportFilter(mt.getNext()) {
-         @Override
          public void oneway(Object command) throws IOException {
             LOG.info("Dropping: " + command);
             // just eat it! to simulate a recent failure.
@@ -161,7 +227,6 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
 
       // Send a message (async) as this will block
       new Thread() {
-         @Override
          public void run() {
             // Send the message using the fail over publisher.
             try {
@@ -175,7 +240,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       }.start();
 
       // Assert that we block:
-      assertFalse(publishDone.await(3, TimeUnit.SECONDS));
+      Assert.assertFalse(publishDone.await(3, TimeUnit.SECONDS));
 
       // Restart the remote server. State should be re-played and the publish
       // should continue.
@@ -184,26 +249,127 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
       LOG.info("Broker Restarted");
 
       // This should reconnect, and resend
-      assertTrue(publishDone.await(20, TimeUnit.SECONDS));
+      Assert.assertTrue(publishDone.await(20, TimeUnit.SECONDS));
 
    }
 
-   @Override
    protected String getLocalURI() {
       return "tcp://localhost:61616";
    }
 
-   @Override
    protected String getRemoteURI() {
       return "tcp://localhost:61617";
    }
 
    protected StubConnection createFanoutConnection() throws Exception {
-      URI fanoutURI = new URI("fanout://(static://(" + connector.getServer().getConnectURI() + "," + "mock://" + remoteConnector.getServer().getConnectURI() + "))?fanOutQueues=true");
+      URI fanoutURI = new URI("fanout://(static://(" + newURI(0) + "," + "mock://" + newURI(1) + "))?fanOutQueues=true");
       Transport transport = TransportFactory.connect(fanoutURI);
       StubConnection connection = new StubConnection(transport);
       connections.add(connection);
       return connection;
    }
 
+
+   protected StubConnection createConnection() throws Exception {
+      Transport transport = TransportFactory.connect(new URI(newURI(0)));
+      StubConnection connection = new StubConnection(transport);
+      connections.add(connection);
+      return connection;
+   }
+
+   protected StubConnection createRemoteConnection() throws Exception {
+      Transport transport = TransportFactory.connect(new URI(newURI(1)));
+      StubConnection connection = new StubConnection(transport);
+      connections.add(connection);
+      return connection;
+   }
+
+   protected ConnectionInfo createConnectionInfo() throws Exception {
+      ConnectionInfo info = new ConnectionInfo();
+      info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+      info.setClientId(info.getConnectionId().getValue());
+      return info;
+   }
+
+   protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+      SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+      return info;
+   }
+
+   protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
+                                             ActiveMQDestination destination) throws Exception {
+      ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+      info.setBrowser(false);
+      info.setDestination(destination);
+      info.setPrefetchSize(1000);
+      info.setDispatchAsync(false);
+      return info;
+   }
+
+   protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+      ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+      return info;
+   }
+
+   protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+      Message message = createMessage(producerInfo, destination);
+      message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+      return message;
+   }
+
+   protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+      ActiveMQTextMessage message = new ActiveMQTextMessage();
+      message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+      message.setDestination(destination);
+      message.setPersistent(false);
+      try {
+         message.setText("Test Message Payload.");
+      }
+      catch (MessageNotWriteableException e) {
+      }
+      return message;
+   }
+
+   public Message receiveMessage(StubConnection connection) throws InterruptedException {
+      return receiveMessage(connection, maxWait);
+   }
+
+   public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+      while (true) {
+         Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+         if (o == null) {
+            return null;
+         }
+         if (o instanceof MessageDispatch) {
+
+            MessageDispatch dispatch = (MessageDispatch) o;
+            if (dispatch.getMessage() == null) {
+               return null;
+            }
+            dispatch.setMessage(dispatch.getMessage().copy());
+            dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+            return dispatch.getMessage();
+         }
+      }
+   }
+
+   protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
+      long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+      while (true) {
+         Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
+         if (o == null) {
+            return;
+         }
+         if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) {
+            Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId());
+         }
+      }
+   }
+   protected void restartRemoteBroker() throws Exception {
+      remoteServer.stop();
+      Thread.sleep(2000);
+      remoteServer.start();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index 619190f..01f6963 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -173,11 +173,8 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
    }
 
    public void testClientHang() throws Exception {
-
-      //
       // Manually create a client transport so that it does not send KeepAlive
-      // packets.
-      // this should simulate a client hang.
+      // packets.  this should simulate a client hang.
       clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
       clientTransport.setTransportListener(new TransportListener() {
          @Override
@@ -205,9 +202,10 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
          public void transportResumed() {
          }
       });
+
       clientTransport.start();
       WireFormatInfo info = new WireFormatInfo();
-      info.setVersion(OpenWireFormat.DEFAULT_VERSION);
+      info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION);
       info.setMaxInactivityDuration(1000);
       clientTransport.oneway(info);
 
@@ -242,19 +240,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
     * @throws URISyntaxException
     */
    public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
-
       startClient();
 
-      addCombinationValues("clientInactivityLimit", new Object[]{Long.valueOf(1000)});
-      addCombinationValues("serverInactivityLimit", new Object[]{Long.valueOf(1000)});
-      addCombinationValues("serverRunOnCommand", new Object[]{new Runnable() {
+      addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
+      addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});
+      addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() {
          @Override
          public void run() {
             try {
                LOG.info("Sleeping");
                Thread.sleep(4000);
-            }
-            catch (InterruptedException e) {
+            } catch (InterruptedException e) {
             }
          }
       }});
@@ -272,5 +268,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
       assertEquals(0, clientErrorCount.get());
       assertEquals(0, serverErrorCount.get());
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
index 9ae82ac..9d3c347 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
@@ -23,7 +23,6 @@ import junit.framework.Test;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -183,15 +182,6 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
       super.tearDown();
    }
 
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setUseJmx(false);
-      answer.setPersistent(isPersistent());
-      answer.addConnector(bindAddress);
-      return answer;
-   }
-
    public static Test suite() {
       return suite(TransportUriTest.class);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
deleted file mode 100644
index 3791848..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-
-public class VMTransportBrokerNameTest extends TestCase {
-
-   private static final String MY_BROKER = "myBroker";
-   final String vmUrl = "vm:(broker:(tcp://localhost:61616)/" + MY_BROKER + "?persistent=false)";
-
-   public void testBrokerName() throws Exception {
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl));
-      ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
-      assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER));
-
-      // verify Broker is there with name
-      ActiveMQConnectionFactory cfbyName = new ActiveMQConnectionFactory(new URI("vm://" + MY_BROKER + "?create=false"));
-      Connection c2 = cfbyName.createConnection();
-
-      assertNotNull(BrokerRegistry.getInstance().lookup(MY_BROKER));
-      assertEquals(BrokerRegistry.getInstance().findFirst().getBrokerName(), MY_BROKER);
-      assertEquals(BrokerRegistry.getInstance().getBrokers().size(), 1);
-
-      c1.close();
-      c2.close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
deleted file mode 100644
index 52e4b88..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import junit.framework.Test;
-
-import org.apache.activemq.transport.TransportBrokerTestSupport;
-
-public class VMTransportBrokerTest extends TransportBrokerTestSupport {
-
-   @Override
-   protected String getBindLocation() {
-      return "vm://localhost";
-   }
-
-   public static Test suite() {
-      return suite(VMTransportBrokerTest.class);
-   }
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
deleted file mode 100644
index dbc7f29..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import javax.jms.DeliveryMode;
-
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerTestSupport;
-import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.util.IOExceptionSupport;
-
-/**
- * Used to see if the VM transport starts an embedded broker on demand.
- */
-public class VMTransportEmbeddedBrokerTest extends BrokerTestSupport {
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(VMTransportEmbeddedBrokerTest.class);
-   }
-
-   public void testConsumerPrefetchAtOne() throws Exception {
-
-      // Make sure the broker is created due to the connection being started.
-      assertNull(BrokerRegistry.getInstance().lookup("localhost"));
-      StubConnection connection = createConnection();
-      assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
-
-      // Start a producer and consumer
-      ConnectionInfo connectionInfo = createConnectionInfo();
-      SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-      ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-      connection.send(connectionInfo);
-      connection.send(sessionInfo);
-      connection.send(producerInfo);
-
-      ActiveMQQueue destination = new ActiveMQQueue("TEST");
-
-      ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-      consumerInfo.setPrefetchSize(1);
-      connection.send(consumerInfo);
-
-      // Send 2 messages to the broker.
-      connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
-      connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
-
-      // Make sure only 1 message was delivered.
-      Message m = receiveMessage(connection);
-      assertNotNull(m);
-      assertNoMessagesLeft(connection);
-
-      // Make sure the broker is shutdown when the connection is stopped.
-      assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
-      connection.stop();
-      assertNull(BrokerRegistry.getInstance().lookup("localhost"));
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      // Don't call super since it manually starts up a broker.
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      // Don't call super since it manually tears down a broker.
-   }
-
-   @Override
-   protected StubConnection createConnection() throws Exception {
-      try {
-         Transport transport = TransportFactory.connect(new URI("vm://localhost?broker.persistent=false"));
-         StubConnection connection = new StubConnection(transport);
-         return connection;
-      }
-      catch (URISyntaxException e) {
-         throw IOExceptionSupport.create(e);
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
deleted file mode 100644
index 2268048..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ /dev/null
@@ -1,937 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.command.BaseCommand;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.transport.FutureResponse;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.ResponseCallback;
-import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportThreadSafeTest {
-
-   private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
-
-   private final static String location1 = "vm://transport1";
-   private final static String location2 = "vm://transport2";
-
-   private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue<>();
-   private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue<>();
-
-   private class DummyCommand extends BaseCommand {
-
-      public final int sequenceId;
-
-      public DummyCommand() {
-         this.sequenceId = 0;
-      }
-
-      public DummyCommand(int id) {
-         this.sequenceId = id;
-      }
-
-      @Override
-      public Response visit(CommandVisitor visitor) throws Exception {
-         return null;
-      }
-
-      @Override
-      public byte getDataStructureType() {
-         return 42;
-      }
-   }
-
-   private class VMTestTransportListener implements TransportListener {
-
-      protected final Queue<DummyCommand> received;
-
-      public boolean shutdownReceived = false;
-
-      public VMTestTransportListener(Queue<DummyCommand> receiveQueue) {
-         this.received = receiveQueue;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-
-         if (command instanceof ShutdownInfo) {
-            shutdownReceived = true;
-         }
-         else {
-            received.add((DummyCommand) command);
-         }
-      }
-
-      @Override
-      public void onException(IOException error) {
-      }
-
-      @Override
-      public void transportInterupted() {
-      }
-
-      @Override
-      public void transportResumed() {
-      }
-   }
-
-   private class VMResponderTransportListener implements TransportListener {
-
-      protected final Queue<DummyCommand> received;
-
-      private final Transport peer;
-
-      public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport peer) {
-         this.received = receiveQueue;
-         this.peer = peer;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-
-         if (command instanceof ShutdownInfo) {
-            return;
-         }
-         else {
-            received.add((DummyCommand) command);
-
-            if (peer != null) {
-               try {
-                  peer.oneway(command);
-               }
-               catch (IOException e) {
-               }
-            }
-         }
-      }
-
-      @Override
-      public void onException(IOException error) {
-      }
-
-      @Override
-      public void transportInterupted() {
-      }
-
-      @Override
-      public void transportResumed() {
-      }
-   }
-
-   private class SlowVMTestTransportListener extends VMTestTransportListener {
-
-      private final TimeUnit delayUnit;
-      private final long delay;
-
-      public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
-         this(receiveQueue, 10, TimeUnit.MILLISECONDS);
-      }
-
-      public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue, long delay, TimeUnit delayUnit) {
-         super(receiveQueue);
-
-         this.delay = delay;
-         this.delayUnit = delayUnit;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-         super.onCommand(command);
-         try {
-            delayUnit.sleep(delay);
-         }
-         catch (InterruptedException e) {
-         }
-      }
-   }
-
-   private class GatedVMTestTransportListener extends VMTestTransportListener {
-
-      private final CountDownLatch gate;
-
-      public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
-         this(receiveQueue, new CountDownLatch(1));
-      }
-
-      public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue, CountDownLatch gate) {
-         super(receiveQueue);
-
-         this.gate = gate;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-         super.onCommand(command);
-         try {
-            gate.await();
-         }
-         catch (InterruptedException e) {
-         }
-      }
-   }
-
-   private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> queue) {
-      int lastSequenceId = 0;
-      for (DummyCommand command : queue) {
-         int id = command.sequenceId;
-         assertTrue("Last id: " + lastSequenceId + " should be less than current id: " + id, id > lastSequenceId);
-      }
-   }
-
-   @Before
-   public void setUp() throws Exception {
-      localReceived.clear();
-      remoteReceived.clear();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-   }
-
-   @Test(timeout = 60000)
-   public void testStartWthoutListenerIOE() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      remote.setTransportListener(new VMTestTransportListener(localReceived));
-
-      try {
-         local.start();
-         fail("Should have thrown an IOExcoption");
-      }
-      catch (IOException e) {
-      }
-   }
-
-   @Test(timeout = 60000)
-   public void testOnewayOnStoppedTransportTDE() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-      local.stop();
-
-      try {
-         local.oneway(new DummyCommand());
-         fail("Should have thrown a TransportDisposedException");
-      }
-      catch (TransportDisposedIOException e) {
-      }
-   }
-
-   @Test(timeout = 60000)
-   public void testStopSendsShutdownToPeer() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(remoteListener);
-
-      local.start();
-      local.stop();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteListener.shutdownReceived;
-         }
-      }));
-   }
-
-   @Test(timeout = 60000)
-   public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
-      remote.setTransportListener(remoteListener);
-      remote.start();
-
-      final Response[] answer = new Response[1];
-      ResponseCorrelator responseCorrelator = new ResponseCorrelator(local);
-      responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived));
-      responseCorrelator.start();
-      responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() {
-         @Override
-         public void onCompletion(FutureResponse resp) {
-            try {
-               answer[0] = resp.getResult();
-            }
-            catch (IOException e) {
-               e.printStackTrace();
-            }
-         }
-      });
-
-      // simulate broker stop
-      remote.stop();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            LOG.info("answer: " + answer[0]);
-            return answer[0] instanceof ExceptionResponse && ((ExceptionResponse) answer[0]).getException() instanceof TransportDisposedIOException;
-         }
-      }));
-
-      local.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testMultipleStartsAndStops() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-      remote.start();
-
-      local.start();
-      remote.start();
-
-      for (int i = 0; i < 100; ++i) {
-         local.oneway(new DummyCommand());
-      }
-
-      for (int i = 0; i < 100; ++i) {
-         remote.oneway(new DummyCommand());
-      }
-
-      local.start();
-      remote.start();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == 100;
-         }
-      }));
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return localReceived.size() == 100;
-         }
-      }));
-
-      local.stop();
-      local.stop();
-      remote.stop();
-      remote.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
-      doTestStartWithPeerNotStartedEnqueusCommands(false);
-   }
-
-   private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-
-      for (int i = 0; i < 100; ++i) {
-         local.oneway(new DummyCommand());
-      }
-
-      assertEquals(100, remote.getMessageQueue().size());
-
-      remote.start();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == 100;
-         }
-      }));
-
-      local.stop();
-      remote.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
-      doTestBlockedOnewayEnqeueAandStopTransport(true);
-   }
-
-   @Test(timeout = 60000)
-   public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
-      doTestBlockedOnewayEnqeueAandStopTransport(false);
-   }
-
-   private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(async);
-      remote.setAsyncQueueDepth(99);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-
-      Thread t = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < 100; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-      t.start();
-
-      LOG.debug("Started async delivery, wait for remote's queue to fill up");
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remote.getMessageQueue().remainingCapacity() == 0;
-         }
-      }));
-
-      LOG.debug("Remote messageQ is full, start it and stop all");
-
-      remote.start();
-      local.stop();
-      remote.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(true);
-      remote.setAsyncQueueDepth(2);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new GatedVMTestTransportListener(remoteReceived));
-
-      local.start();
-      remote.start();
-
-      Thread t = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < 3; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-      t.start();
-
-      LOG.debug("Started async delivery, wait for remote's queue to fill up");
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remote.getMessageQueue().remainingCapacity() == 0;
-         }
-      }));
-
-      LOG.debug("Starting async gate open.");
-      Thread gateman = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               Thread.sleep(100);
-            }
-            catch (InterruptedException e) {
-            }
-            ((GatedVMTestTransportListener) remote.getTransportListener()).gate.countDown();
-         }
-      });
-      gateman.start();
-
-      remote.stop();
-      local.stop();
-
-      assertEquals(1, remoteReceived.size());
-      assertMessageAreOrdered(remoteReceived);
-   }
-
-   @Test(timeout = 60000)
-   public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
-      // In the async case the iterate method should see that we are stopping and
-      // drop out before we dispatch all the messages but it should get at least 49 since
-      // the stop thread waits 500 mills and the listener is waiting 10 mills on each receive.
-      doTestStopWhileStartingWithNoAsyncLimit(true, 49);
-   }
-
-   @Test(timeout = 60000)
-   public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
-      // In the non-async case the start dispatches all messages up front and then continues on
-      doTestStopWhileStartingWithNoAsyncLimit(false, 100);
-   }
-
-   private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new SlowVMTestTransportListener(remoteReceived));
-
-      local.start();
-
-      for (int i = 0; i < 100; ++i) {
-         local.oneway(new DummyCommand(i));
-      }
-
-      Thread t = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            try {
-               Thread.sleep(1000);
-               remote.stop();
-            }
-            catch (Exception e) {
-            }
-         }
-      });
-
-      remote.start();
-
-      t.start();
-
-      assertTrue("Remote should receive: " + expect + ", commands but got: " + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() >= expect;
-         }
-      }));
-
-      LOG.debug("Remote listener received " + remoteReceived.size() + " messages");
-
-      local.stop();
-
-      assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remote.isDisposed();
-         }
-      }));
-   }
-
-   @Test(timeout = 120000)
-   public void TestTwoWayMessageThroughPutSync() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 20;
-
-      for (int i = 0; i < 20; ++i) {
-         totalTimes += doTestTwoWayMessageThroughPut(false);
-      }
-
-      LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   @Test(timeout = 120000)
-   public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 50;
-
-      for (int i = 0; i < executions; ++i) {
-         totalTimes += doTestTwoWayMessageThroughPut(false);
-      }
-
-      LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   private long doTestTwoWayMessageThroughPut(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      final int messageCount = 200000;
-
-      local.start();
-      remote.start();
-
-      long startTime = System.currentTimeMillis();
-
-      Thread localSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-
-      Thread remoteSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-
-      localSend.start();
-      remoteSend.start();
-
-      // Wait for both to finish and then check that each side go the correct amount
-      localSend.join();
-      remoteSend.join();
-
-      long endTime = System.currentTimeMillis();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == messageCount;
-         }
-      }));
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return localReceived.size() == messageCount;
-         }
-      }));
-
-      LOG.debug("All messages sent,stop all");
-
-      local.stop();
-      remote.stop();
-
-      localReceived.clear();
-      remoteReceived.clear();
-
-      return endTime - startTime;
-   }
-
-   @Test(timeout = 120000)
-   public void TestOneWayMessageThroughPutSync() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 30;
-
-      for (int i = 0; i < executions; ++i) {
-         totalTimes += doTestOneWayMessageThroughPut(false);
-      }
-
-      LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   @Test(timeout = 120000)
-   public void TestOneWayMessageThroughPutAsnyc() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 20;
-
-      for (int i = 0; i < 20; ++i) {
-         totalTimes += doTestOneWayMessageThroughPut(true);
-      }
-
-      LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   private long doTestOneWayMessageThroughPut(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      final int messageCount = 100000;
-
-      local.start();
-      remote.start();
-
-      long startTime = System.currentTimeMillis();
-
-      Thread localSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-
-      localSend.start();
-
-      // Wait for both to finish and then check that each side go the correct amount
-      localSend.join();
-
-      long endTime = System.currentTimeMillis();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == messageCount;
-         }
-      }));
-
-      LOG.debug("All messages sent,stop all");
-
-      local.stop();
-      remote.stop();
-
-      localReceived.clear();
-      remoteReceived.clear();
-
-      return endTime - startTime;
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(false, false);
-      }
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(true, false);
-      }
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(false, true);
-      }
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(false, false);
-      }
-   }
-
-   public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception {
-
-      final VMTransport vmlocal = new VMTransport(new URI(location1));
-      final VMTransport vmremote = new VMTransport(new URI(location2));
-
-      final MutexTransport local = new MutexTransport(vmlocal);
-      final MutexTransport remote = new MutexTransport(vmremote);
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      vmlocal.setAsync(localAsync);
-      vmremote.setAsync(remoteAsync);
-
-      vmlocal.setPeer(vmremote);
-      vmremote.setPeer(vmlocal);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote));
-
-      final int messageCount = 200000;
-
-      Thread localSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-         }
-      });
-
-      Thread remoteSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-         }
-      });
-
-      localSend.start();
-      remoteSend.start();
-
-      Thread.sleep(10);
-
-      local.start();
-      remote.start();
-
-      // Wait for both to finish and then check that each side go the correct amount
-      localSend.join();
-      remoteSend.join();
-
-      assertTrue("Remote should have received (" + messageCount + ") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == messageCount;
-         }
-      }));
-
-      assertTrue("Local should have received (" + messageCount * 2 + ") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return localReceived.size() == messageCount * 2;
-         }
-      }));
-
-      LOG.debug("All messages sent,stop all");
-
-      local.stop();
-      remote.stop();
-
-      localReceived.clear();
-      remoteReceived.clear();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
deleted file mode 100644
index dd14d67..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportWaitForTest {
-
-   static final Logger LOG = LoggerFactory.getLogger(VMTransportWaitForTest.class);
-
-   private static final int WAIT_TIME = 20000;
-   private static final int SHORT_WAIT_TIME = 5000;
-
-   private static final String VM_BROKER_URI_NO_WAIT = "vm://localhost?broker.persistent=false&create=false";
-
-   private static final String VM_BROKER_URI_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME;
-
-   private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME;
-
-   CountDownLatch started = new CountDownLatch(1);
-   CountDownLatch gotConnection = new CountDownLatch(1);
-
-   @After
-   public void after() throws IOException {
-      BrokerRegistry.getInstance().unbind("localhost");
-   }
-
-   @Test(timeout = 90000)
-   public void testWaitFor() throws Exception {
-      try {
-         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT));
-         cf.createConnection();
-         fail("expect broker not exist exception");
-      }
-      catch (JMSException expectedOnNoBrokerAndNoCreate) {
-      }
-
-      // spawn a thread that will wait for an embedded broker to start via
-      // vm://..
-      Thread t = new Thread("ClientConnectionThread") {
-         @Override
-         public void run() {
-            try {
-               started.countDown();
-               ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START));
-               cf.createConnection();
-               gotConnection.countDown();
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-               fail("unexpected exception: " + e);
-            }
-         }
-      };
-      t.start();
-      started.await(20, TimeUnit.SECONDS);
-      Thread.yield();
-      assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS));
-
-      BrokerService broker = new BrokerService();
-      broker.setPersistent(false);
-      broker.start();
-      assertTrue("has got connection", gotConnection.await(5, TimeUnit.SECONDS));
-      broker.stop();
-   }
-
-   @Test(timeout = 90000)
-   public void testWaitForNoBrokerInRegistry() throws Exception {
-
-      long startTime = System.currentTimeMillis();
-
-      try {
-         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
-         cf.createConnection();
-         fail("expect broker not exist exception");
-      }
-      catch (JMSException expectedOnNoBrokerAndNoCreate) {
-      }
-
-      long endTime = System.currentTimeMillis();
-
-      LOG.info("Total wait time was: {}", endTime - startTime);
-      assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
-   }
-
-   @Test(timeout = 90000)
-   public void testWaitForNotStartedButInRegistry() throws Exception {
-
-      BrokerService broker = new BrokerService();
-      broker.setPersistent(false);
-      BrokerRegistry.getInstance().bind("localhost", broker);
-
-      long startTime = System.currentTimeMillis();
-
-      try {
-         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
-         cf.createConnection();
-         fail("expect broker not exist exception");
-      }
-      catch (JMSException expectedOnNoBrokerAndNoCreate) {
-      }
-
-      long endTime = System.currentTimeMillis();
-
-      LOG.info("Total wait time was: {}", endTime - startTime);
-      assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
deleted file mode 100644
index 2b97cff..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.bugs.embedded.ThreadExplorer;
-import org.apache.activemq.network.NetworkConnector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VmTransportNetworkBrokerTest extends TestCase {
-
-   private static final Logger LOG = LoggerFactory.getLogger(VmTransportNetworkBrokerTest.class);
-
-   private static final String VM_BROKER_URI = "vm://localhost?create=false";
-
-   CountDownLatch started = new CountDownLatch(1);
-   CountDownLatch gotConnection = new CountDownLatch(1);
-
-   public void testNoThreadLeak() throws Exception {
-
-      // with VMConnection and simple discovery network connector
-      int originalThreadCount = Thread.activeCount();
-      LOG.debug(ThreadExplorer.show("threads at beginning"));
-
-      BrokerService broker = new BrokerService();
-      broker.setDedicatedTaskRunner(true);
-      broker.setPersistent(false);
-      broker.addConnector("tcp://localhost:61616");
-      NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://wrongHostname1:61617,tcp://wrongHostname2:61618)?useExponentialBackOff=false");
-      networkConnector.setDuplex(true);
-      broker.start();
-
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI));
-      Connection connection = cf.createConnection("system", "manager");
-      connection.start();
-
-      // let it settle
-      TimeUnit.SECONDS.sleep(5);
-
-      int threadCountAfterStart = Thread.activeCount();
-      TimeUnit.SECONDS.sleep(30);
-      int threadCountAfterSleep = Thread.activeCount();
-
-      assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep, threadCountAfterSleep < threadCountAfterStart + 8);
-
-      connection.close();
-      broker.stop();
-      broker.waitUntilStopped();
-
-      // testNoDanglingThreadsAfterStop with tcp transport
-      broker = new BrokerService();
-      broker.setSchedulerSupport(true);
-      broker.setDedicatedTaskRunner(true);
-      broker.setPersistent(false);
-      broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
-      broker.start();
-
-      cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
-      connection = cf.createConnection("system", "manager");
-      connection.start();
-      connection.close();
-      broker.stop();
-      broker.waitUntilStopped();
-
-      // let it settle
-      TimeUnit.SECONDS.sleep(5);
-
-      // get final threads but filter out any daemon threads that the JVM may have created.
-      Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads());
-      int threadCountAfterStop = threads.length;
-
-      // lets see the thread counts at INFO level so they are always in the test log
-      LOG.info(ThreadExplorer.show("active after stop"));
-      LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop);
-
-      assertTrue("Threads are leaking: " +
-                    ThreadExplorer.show("active after stop") +
-                    ". originalThreadCount=" +
-                    originalThreadCount +
-                    " threadCountAfterStop=" +
-                    threadCountAfterStop, threadCountAfterStop <= originalThreadCount);
-   }
-
-   /**
-    * Filters any daemon threads from the thread list.
-    *
-    * Thread counts before and after the test should ideally be equal.
-    * However there is no guarantee that the JVM does not create any
-    * additional threads itself.
-    * E.g. on Mac OSX there is a JVM internal thread called
-    * "Poller SunPKCS11-Darwin" created after the test go started and
-    * under the main thread group.
-    * When debugging tests in Eclipse another so called "Reader" thread
-    * is created by Eclipse.
-    * So we cannot assume that the JVM does not create additional threads
-    * during the test. However for the time being we assume that any such
-    * additionally created threads are daemon threads.
-    *
-    * @param threads - the array of threads to parse
-    * @return a new array with any daemon threads removed
-    */
-   public Thread[] filterDaemonThreads(Thread[] threads) throws Exception {
-
-      List<Thread> threadList = new ArrayList<>(Arrays.asList(threads));
-
-      // Can't use an Iterator as it would raise a
-      // ConcurrentModificationException when trying to remove an element
-      // from the list, so using standard walk through
-      for (int i = 0; i < threadList.size(); i++) {
-
-         Thread thread = threadList.get(i);
-         LOG.debug("Inspecting thread " + thread.getName());
-         if (thread.isDaemon()) {
-            LOG.debug("Removing deamon thread.");
-            threadList.remove(thread);
-            Thread.sleep(100);
-
-         }
-      }
-      LOG.debug("Converting list back to Array");
-      return threadList.toArray(new Thread[0]);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f25b6e30/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
new file mode 100644
index 0000000..03e0d2e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LockFileTest {
+
+    @Test
+    public void testNoDeleteOnUnlockIfNotLocked() throws Exception {
+
+        File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest1");
+        IOHelper.mkdirs(lockFile.getParentFile());
+        lockFile.createNewFile();
+
+        LockFile underTest = new LockFile(lockFile, true);
+
+        underTest.lock();
+
+        lockFile.delete();
+
+        assertFalse("no longer valid", underTest.keepAlive());
+
+        // a slave gets in
+        lockFile.createNewFile();
+
+        underTest.unlock();
+
+        assertTrue("file still exists after unlock when not locked", lockFile.exists());
+
+    }
+
+    @Test
+    public void testDeleteOnUnlockIfLocked() throws Exception {
+
+        File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest2");
+        IOHelper.mkdirs(lockFile.getParentFile());
+        lockFile.createNewFile();
+
+        LockFile underTest = new LockFile(lockFile, true);
+
+        underTest.lock();
+
+        assertTrue("valid", underTest.keepAlive());
+
+        underTest.unlock();
+
+        assertFalse("file deleted on unlock", lockFile.exists());
+
+    }
+}


Mime
View raw message