activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [04/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
Date Mon, 04 Apr 2016 16:09:13 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
deleted file mode 100644
index 2268048..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ /dev/null
@@ -1,937 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.command.BaseCommand;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.transport.FutureResponse;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.ResponseCallback;
-import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportThreadSafeTest {
-
-   private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
-
-   private final static String location1 = "vm://transport1";
-   private final static String location2 = "vm://transport2";
-
-   private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue<>();
-   private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue<>();
-
-   private class DummyCommand extends BaseCommand {
-
-      public final int sequenceId;
-
-      public DummyCommand() {
-         this.sequenceId = 0;
-      }
-
-      public DummyCommand(int id) {
-         this.sequenceId = id;
-      }
-
-      @Override
-      public Response visit(CommandVisitor visitor) throws Exception {
-         return null;
-      }
-
-      @Override
-      public byte getDataStructureType() {
-         return 42;
-      }
-   }
-
-   private class VMTestTransportListener implements TransportListener {
-
-      protected final Queue<DummyCommand> received;
-
-      public boolean shutdownReceived = false;
-
-      public VMTestTransportListener(Queue<DummyCommand> receiveQueue) {
-         this.received = receiveQueue;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-
-         if (command instanceof ShutdownInfo) {
-            shutdownReceived = true;
-         }
-         else {
-            received.add((DummyCommand) command);
-         }
-      }
-
-      @Override
-      public void onException(IOException error) {
-      }
-
-      @Override
-      public void transportInterupted() {
-      }
-
-      @Override
-      public void transportResumed() {
-      }
-   }
-
-   private class VMResponderTransportListener implements TransportListener {
-
-      protected final Queue<DummyCommand> received;
-
-      private final Transport peer;
-
-      public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport peer) {
-         this.received = receiveQueue;
-         this.peer = peer;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-
-         if (command instanceof ShutdownInfo) {
-            return;
-         }
-         else {
-            received.add((DummyCommand) command);
-
-            if (peer != null) {
-               try {
-                  peer.oneway(command);
-               }
-               catch (IOException e) {
-               }
-            }
-         }
-      }
-
-      @Override
-      public void onException(IOException error) {
-      }
-
-      @Override
-      public void transportInterupted() {
-      }
-
-      @Override
-      public void transportResumed() {
-      }
-   }
-
-   private class SlowVMTestTransportListener extends VMTestTransportListener {
-
-      private final TimeUnit delayUnit;
-      private final long delay;
-
-      public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
-         this(receiveQueue, 10, TimeUnit.MILLISECONDS);
-      }
-
-      public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue, long delay, TimeUnit delayUnit) {
-         super(receiveQueue);
-
-         this.delay = delay;
-         this.delayUnit = delayUnit;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-         super.onCommand(command);
-         try {
-            delayUnit.sleep(delay);
-         }
-         catch (InterruptedException e) {
-         }
-      }
-   }
-
-   private class GatedVMTestTransportListener extends VMTestTransportListener {
-
-      private final CountDownLatch gate;
-
-      public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
-         this(receiveQueue, new CountDownLatch(1));
-      }
-
-      public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue, CountDownLatch gate) {
-         super(receiveQueue);
-
-         this.gate = gate;
-      }
-
-      @Override
-      public void onCommand(Object command) {
-         super.onCommand(command);
-         try {
-            gate.await();
-         }
-         catch (InterruptedException e) {
-         }
-      }
-   }
-
-   private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> queue) {
-      int lastSequenceId = 0;
-      for (DummyCommand command : queue) {
-         int id = command.sequenceId;
-         assertTrue("Last id: " + lastSequenceId + " should be less than current id: " + id, id > lastSequenceId);
-      }
-   }
-
-   @Before
-   public void setUp() throws Exception {
-      localReceived.clear();
-      remoteReceived.clear();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-   }
-
-   @Test(timeout = 60000)
-   public void testStartWthoutListenerIOE() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      remote.setTransportListener(new VMTestTransportListener(localReceived));
-
-      try {
-         local.start();
-         fail("Should have thrown an IOExcoption");
-      }
-      catch (IOException e) {
-      }
-   }
-
-   @Test(timeout = 60000)
-   public void testOnewayOnStoppedTransportTDE() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-      local.stop();
-
-      try {
-         local.oneway(new DummyCommand());
-         fail("Should have thrown a TransportDisposedException");
-      }
-      catch (TransportDisposedIOException e) {
-      }
-   }
-
-   @Test(timeout = 60000)
-   public void testStopSendsShutdownToPeer() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(remoteListener);
-
-      local.start();
-      local.stop();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteListener.shutdownReceived;
-         }
-      }));
-   }
-
-   @Test(timeout = 60000)
-   public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
-      remote.setTransportListener(remoteListener);
-      remote.start();
-
-      final Response[] answer = new Response[1];
-      ResponseCorrelator responseCorrelator = new ResponseCorrelator(local);
-      responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived));
-      responseCorrelator.start();
-      responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() {
-         @Override
-         public void onCompletion(FutureResponse resp) {
-            try {
-               answer[0] = resp.getResult();
-            }
-            catch (IOException e) {
-               e.printStackTrace();
-            }
-         }
-      });
-
-      // simulate broker stop
-      remote.stop();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            LOG.info("answer: " + answer[0]);
-            return answer[0] instanceof ExceptionResponse && ((ExceptionResponse) answer[0]).getException() instanceof TransportDisposedIOException;
-         }
-      }));
-
-      local.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testMultipleStartsAndStops() throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-      remote.start();
-
-      local.start();
-      remote.start();
-
-      for (int i = 0; i < 100; ++i) {
-         local.oneway(new DummyCommand());
-      }
-
-      for (int i = 0; i < 100; ++i) {
-         remote.oneway(new DummyCommand());
-      }
-
-      local.start();
-      remote.start();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == 100;
-         }
-      }));
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return localReceived.size() == 100;
-         }
-      }));
-
-      local.stop();
-      local.stop();
-      remote.stop();
-      remote.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
-      doTestStartWithPeerNotStartedEnqueusCommands(false);
-   }
-
-   private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-
-      for (int i = 0; i < 100; ++i) {
-         local.oneway(new DummyCommand());
-      }
-
-      assertEquals(100, remote.getMessageQueue().size());
-
-      remote.start();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == 100;
-         }
-      }));
-
-      local.stop();
-      remote.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
-      doTestBlockedOnewayEnqeueAandStopTransport(true);
-   }
-
-   @Test(timeout = 60000)
-   public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
-      doTestBlockedOnewayEnqeueAandStopTransport(false);
-   }
-
-   private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(async);
-      remote.setAsyncQueueDepth(99);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      local.start();
-
-      Thread t = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < 100; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-      t.start();
-
-      LOG.debug("Started async delivery, wait for remote's queue to fill up");
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remote.getMessageQueue().remainingCapacity() == 0;
-         }
-      }));
-
-      LOG.debug("Remote messageQ is full, start it and stop all");
-
-      remote.start();
-      local.stop();
-      remote.stop();
-   }
-
-   @Test(timeout = 60000)
-   public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(true);
-      remote.setAsyncQueueDepth(2);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new GatedVMTestTransportListener(remoteReceived));
-
-      local.start();
-      remote.start();
-
-      Thread t = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < 3; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-      t.start();
-
-      LOG.debug("Started async delivery, wait for remote's queue to fill up");
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remote.getMessageQueue().remainingCapacity() == 0;
-         }
-      }));
-
-      LOG.debug("Starting async gate open.");
-      Thread gateman = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               Thread.sleep(100);
-            }
-            catch (InterruptedException e) {
-            }
-            ((GatedVMTestTransportListener) remote.getTransportListener()).gate.countDown();
-         }
-      });
-      gateman.start();
-
-      remote.stop();
-      local.stop();
-
-      assertEquals(1, remoteReceived.size());
-      assertMessageAreOrdered(remoteReceived);
-   }
-
-   @Test(timeout = 60000)
-   public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
-      // In the async case the iterate method should see that we are stopping and
-      // drop out before we dispatch all the messages but it should get at least 49 since
-      // the stop thread waits 500 mills and the listener is waiting 10 mills on each receive.
-      doTestStopWhileStartingWithNoAsyncLimit(true, 49);
-   }
-
-   @Test(timeout = 60000)
-   public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
-      // In the non-async case the start dispatches all messages up front and then continues on
-      doTestStopWhileStartingWithNoAsyncLimit(false, 100);
-   }
-
-   private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new SlowVMTestTransportListener(remoteReceived));
-
-      local.start();
-
-      for (int i = 0; i < 100; ++i) {
-         local.oneway(new DummyCommand(i));
-      }
-
-      Thread t = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            try {
-               Thread.sleep(1000);
-               remote.stop();
-            }
-            catch (Exception e) {
-            }
-         }
-      });
-
-      remote.start();
-
-      t.start();
-
-      assertTrue("Remote should receive: " + expect + ", commands but got: " + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() >= expect;
-         }
-      }));
-
-      LOG.debug("Remote listener received " + remoteReceived.size() + " messages");
-
-      local.stop();
-
-      assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remote.isDisposed();
-         }
-      }));
-   }
-
-   @Test(timeout = 120000)
-   public void TestTwoWayMessageThroughPutSync() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 20;
-
-      for (int i = 0; i < 20; ++i) {
-         totalTimes += doTestTwoWayMessageThroughPut(false);
-      }
-
-      LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   @Test(timeout = 120000)
-   public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 50;
-
-      for (int i = 0; i < executions; ++i) {
-         totalTimes += doTestTwoWayMessageThroughPut(false);
-      }
-
-      LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   private long doTestTwoWayMessageThroughPut(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      final int messageCount = 200000;
-
-      local.start();
-      remote.start();
-
-      long startTime = System.currentTimeMillis();
-
-      Thread localSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-
-      Thread remoteSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-
-      localSend.start();
-      remoteSend.start();
-
-      // Wait for both to finish and then check that each side go the correct amount
-      localSend.join();
-      remoteSend.join();
-
-      long endTime = System.currentTimeMillis();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == messageCount;
-         }
-      }));
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return localReceived.size() == messageCount;
-         }
-      }));
-
-      LOG.debug("All messages sent,stop all");
-
-      local.stop();
-      remote.stop();
-
-      localReceived.clear();
-      remoteReceived.clear();
-
-      return endTime - startTime;
-   }
-
-   @Test(timeout = 120000)
-   public void TestOneWayMessageThroughPutSync() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 30;
-
-      for (int i = 0; i < executions; ++i) {
-         totalTimes += doTestOneWayMessageThroughPut(false);
-      }
-
-      LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   @Test(timeout = 120000)
-   public void TestOneWayMessageThroughPutAsnyc() throws Exception {
-
-      long totalTimes = 0;
-      final long executions = 20;
-
-      for (int i = 0; i < 20; ++i) {
-         totalTimes += doTestOneWayMessageThroughPut(true);
-      }
-
-      LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
-   }
-
-   private long doTestOneWayMessageThroughPut(boolean async) throws Exception {
-
-      final VMTransport local = new VMTransport(new URI(location1));
-      final VMTransport remote = new VMTransport(new URI(location2));
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      remote.setAsync(async);
-
-      local.setPeer(remote);
-      remote.setPeer(local);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
-      final int messageCount = 100000;
-
-      local.start();
-      remote.start();
-
-      long startTime = System.currentTimeMillis();
-
-      Thread localSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-
-         }
-      });
-
-      localSend.start();
-
-      // Wait for both to finish and then check that each side go the correct amount
-      localSend.join();
-
-      long endTime = System.currentTimeMillis();
-
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == messageCount;
-         }
-      }));
-
-      LOG.debug("All messages sent,stop all");
-
-      local.stop();
-      remote.stop();
-
-      localReceived.clear();
-      remoteReceived.clear();
-
-      return endTime - startTime;
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(false, false);
-      }
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(true, false);
-      }
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(false, true);
-      }
-   }
-
-   @Test(timeout = 120000)
-   public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
-
-      for (int i = 0; i < 20; ++i) {
-         doTestTwoWayTrafficWithMutexTransport(false, false);
-      }
-   }
-
-   public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception {
-
-      final VMTransport vmlocal = new VMTransport(new URI(location1));
-      final VMTransport vmremote = new VMTransport(new URI(location2));
-
-      final MutexTransport local = new MutexTransport(vmlocal);
-      final MutexTransport remote = new MutexTransport(vmremote);
-
-      final AtomicInteger sequenceId = new AtomicInteger();
-
-      vmlocal.setAsync(localAsync);
-      vmremote.setAsync(remoteAsync);
-
-      vmlocal.setPeer(vmremote);
-      vmremote.setPeer(vmlocal);
-
-      local.setTransportListener(new VMTestTransportListener(localReceived));
-      remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote));
-
-      final int messageCount = 200000;
-
-      Thread localSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-         }
-      });
-
-      Thread remoteSend = new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            for (int i = 0; i < messageCount; ++i) {
-               try {
-                  remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
-               }
-               catch (Exception e) {
-               }
-            }
-         }
-      });
-
-      localSend.start();
-      remoteSend.start();
-
-      Thread.sleep(10);
-
-      local.start();
-      remote.start();
-
-      // Wait for both to finish and then check that each side go the correct amount
-      localSend.join();
-      remoteSend.join();
-
-      assertTrue("Remote should have received (" + messageCount + ") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return remoteReceived.size() == messageCount;
-         }
-      }));
-
-      assertTrue("Local should have received (" + messageCount * 2 + ") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return localReceived.size() == messageCount * 2;
-         }
-      }));
-
-      LOG.debug("All messages sent,stop all");
-
-      local.stop();
-      remote.stop();
-
-      localReceived.clear();
-      remoteReceived.clear();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
deleted file mode 100644
index dd14d67..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportWaitForTest {
-
-   static final Logger LOG = LoggerFactory.getLogger(VMTransportWaitForTest.class);
-
-   private static final int WAIT_TIME = 20000;
-   private static final int SHORT_WAIT_TIME = 5000;
-
-   private static final String VM_BROKER_URI_NO_WAIT = "vm://localhost?broker.persistent=false&create=false";
-
-   private static final String VM_BROKER_URI_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME;
-
-   private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME;
-
-   CountDownLatch started = new CountDownLatch(1);
-   CountDownLatch gotConnection = new CountDownLatch(1);
-
-   @After
-   public void after() throws IOException {
-      BrokerRegistry.getInstance().unbind("localhost");
-   }
-
-   @Test(timeout = 90000)
-   public void testWaitFor() throws Exception {
-      try {
-         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT));
-         cf.createConnection();
-         fail("expect broker not exist exception");
-      }
-      catch (JMSException expectedOnNoBrokerAndNoCreate) {
-      }
-
-      // spawn a thread that will wait for an embedded broker to start via
-      // vm://..
-      Thread t = new Thread("ClientConnectionThread") {
-         @Override
-         public void run() {
-            try {
-               started.countDown();
-               ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START));
-               cf.createConnection();
-               gotConnection.countDown();
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-               fail("unexpected exception: " + e);
-            }
-         }
-      };
-      t.start();
-      started.await(20, TimeUnit.SECONDS);
-      Thread.yield();
-      assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS));
-
-      BrokerService broker = new BrokerService();
-      broker.setPersistent(false);
-      broker.start();
-      assertTrue("has got connection", gotConnection.await(5, TimeUnit.SECONDS));
-      broker.stop();
-   }
-
-   @Test(timeout = 90000)
-   public void testWaitForNoBrokerInRegistry() throws Exception {
-
-      long startTime = System.currentTimeMillis();
-
-      try {
-         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
-         cf.createConnection();
-         fail("expect broker not exist exception");
-      }
-      catch (JMSException expectedOnNoBrokerAndNoCreate) {
-      }
-
-      long endTime = System.currentTimeMillis();
-
-      LOG.info("Total wait time was: {}", endTime - startTime);
-      assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
-   }
-
-   @Test(timeout = 90000)
-   public void testWaitForNotStartedButInRegistry() throws Exception {
-
-      BrokerService broker = new BrokerService();
-      broker.setPersistent(false);
-      BrokerRegistry.getInstance().bind("localhost", broker);
-
-      long startTime = System.currentTimeMillis();
-
-      try {
-         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
-         cf.createConnection();
-         fail("expect broker not exist exception");
-      }
-      catch (JMSException expectedOnNoBrokerAndNoCreate) {
-      }
-
-      long endTime = System.currentTimeMillis();
-
-      LOG.info("Total wait time was: {}", endTime - startTime);
-      assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
deleted file mode 100644
index 2b97cff..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.bugs.embedded.ThreadExplorer;
-import org.apache.activemq.network.NetworkConnector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VmTransportNetworkBrokerTest extends TestCase {
-
-   private static final Logger LOG = LoggerFactory.getLogger(VmTransportNetworkBrokerTest.class);
-
-   private static final String VM_BROKER_URI = "vm://localhost?create=false";
-
-   CountDownLatch started = new CountDownLatch(1);
-   CountDownLatch gotConnection = new CountDownLatch(1);
-
-   public void testNoThreadLeak() throws Exception {
-
-      // with VMConnection and simple discovery network connector
-      int originalThreadCount = Thread.activeCount();
-      LOG.debug(ThreadExplorer.show("threads at beginning"));
-
-      BrokerService broker = new BrokerService();
-      broker.setDedicatedTaskRunner(true);
-      broker.setPersistent(false);
-      broker.addConnector("tcp://localhost:61616");
-      NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://wrongHostname1:61617,tcp://wrongHostname2:61618)?useExponentialBackOff=false");
-      networkConnector.setDuplex(true);
-      broker.start();
-
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI));
-      Connection connection = cf.createConnection("system", "manager");
-      connection.start();
-
-      // let it settle
-      TimeUnit.SECONDS.sleep(5);
-
-      int threadCountAfterStart = Thread.activeCount();
-      TimeUnit.SECONDS.sleep(30);
-      int threadCountAfterSleep = Thread.activeCount();
-
-      assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep, threadCountAfterSleep < threadCountAfterStart + 8);
-
-      connection.close();
-      broker.stop();
-      broker.waitUntilStopped();
-
-      // testNoDanglingThreadsAfterStop with tcp transport
-      broker = new BrokerService();
-      broker.setSchedulerSupport(true);
-      broker.setDedicatedTaskRunner(true);
-      broker.setPersistent(false);
-      broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
-      broker.start();
-
-      cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
-      connection = cf.createConnection("system", "manager");
-      connection.start();
-      connection.close();
-      broker.stop();
-      broker.waitUntilStopped();
-
-      // let it settle
-      TimeUnit.SECONDS.sleep(5);
-
-      // get final threads but filter out any daemon threads that the JVM may have created.
-      Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads());
-      int threadCountAfterStop = threads.length;
-
-      // lets see the thread counts at INFO level so they are always in the test log
-      LOG.info(ThreadExplorer.show("active after stop"));
-      LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop);
-
-      assertTrue("Threads are leaking: " +
-                    ThreadExplorer.show("active after stop") +
-                    ". originalThreadCount=" +
-                    originalThreadCount +
-                    " threadCountAfterStop=" +
-                    threadCountAfterStop, threadCountAfterStop <= originalThreadCount);
-   }
-
-   /**
-    * Filters any daemon threads from the thread list.
-    *
-    * Thread counts before and after the test should ideally be equal.
-    * However there is no guarantee that the JVM does not create any
-    * additional threads itself.
-    * E.g. on Mac OSX there is a JVM internal thread called
-    * "Poller SunPKCS11-Darwin" created after the test go started and
-    * under the main thread group.
-    * When debugging tests in Eclipse another so called "Reader" thread
-    * is created by Eclipse.
-    * So we cannot assume that the JVM does not create additional threads
-    * during the test. However for the time being we assume that any such
-    * additionally created threads are daemon threads.
-    *
-    * @param threads - the array of threads to parse
-    * @return a new array with any daemon threads removed
-    */
-   public Thread[] filterDaemonThreads(Thread[] threads) throws Exception {
-
-      List<Thread> threadList = new ArrayList<>(Arrays.asList(threads));
-
-      // Can't use an Iterator as it would raise a
-      // ConcurrentModificationException when trying to remove an element
-      // from the list, so using standard walk through
-      for (int i = 0; i < threadList.size(); i++) {
-
-         Thread thread = threadList.get(i);
-         LOG.debug("Inspecting thread " + thread.getName());
-         if (thread.isDaemon()) {
-            LOG.debug("Removing deamon thread.");
-            threadList.remove(thread);
-            Thread.sleep(100);
-
-         }
-      }
-      LOG.debug("Converting list back to Array");
-      return threadList.toArray(new Thread[0]);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
index e78ab2f..534e68b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
@@ -390,7 +390,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
             super.send(producerExchange, messageSend);
             if (first.compareAndSet(false, true)) {
                producerExchange.getConnectionContext().setDontSendReponse(true);
-               Executors.newSingleThreadExecutor().execute(new Runnable() {
+               new Thread() {
                   @Override
                   public void run() {
                      try {
@@ -403,7 +403,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
                         e.printStackTrace();
                      }
                   }
-               });
+               }.start();
             }
          }
       }});
@@ -465,7 +465,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
             super.send(producerExchange, messageSend);
             if (first.compareAndSet(false, true)) {
                producerExchange.getConnectionContext().setDontSendReponse(true);
-               Executors.newSingleThreadExecutor().execute(new Runnable() {
+               new Thread() {
                   @Override
                   public void run() {
                      try {
@@ -478,7 +478,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
                         e.printStackTrace();
                      }
                   }
-               });
+               }.start();
             }
          }
       }});

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
new file mode 100644
index 0000000..03e0d2e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LockFileTest {
+
+    @Test
+    public void testNoDeleteOnUnlockIfNotLocked() throws Exception {
+
+        File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest1");
+        IOHelper.mkdirs(lockFile.getParentFile());
+        lockFile.createNewFile();
+
+        LockFile underTest = new LockFile(lockFile, true);
+
+        underTest.lock();
+
+        lockFile.delete();
+
+        assertFalse("no longer valid", underTest.keepAlive());
+
+        // a slave gets in
+        lockFile.createNewFile();
+
+        underTest.unlock();
+
+        assertTrue("file still exists after unlock when not locked", lockFile.exists());
+
+    }
+
+    @Test
+    public void testDeleteOnUnlockIfLocked() throws Exception {
+
+        File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest2");
+        IOHelper.mkdirs(lockFile.getParentFile());
+        lockFile.createNewFile();
+
+        LockFile underTest = new LockFile(lockFile, true);
+
+        underTest.lock();
+
+        assertTrue("valid", underTest.keepAlive());
+
+        underTest.unlock();
+
+        assertFalse("file deleted on unlock", lockFile.exists());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java
new file mode 100644
index 0000000..b01a4e1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketProxy {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(SocketProxy.class);
+
+    public static final int ACCEPT_TIMEOUT_MILLIS = 100;
+
+    private URI proxyUrl;
+    private URI target;
+
+    private Acceptor acceptor;
+    private ServerSocket serverSocket;
+
+    private CountDownLatch closed = new CountDownLatch(1);
+
+    public final List<Bridge> connections = new LinkedList<Bridge>();
+
+    private int listenPort = 0;
+
+    private int receiveBufferSize = -1;
+
+    private boolean pauseAtStart = false;
+
+    private int acceptBacklog = 50;
+
+    public SocketProxy() throws Exception {
+    }
+
+    public SocketProxy(URI uri) throws Exception {
+        this(0, uri);
+    }
+
+    public SocketProxy(int port, URI uri) throws Exception {
+        listenPort = port;
+        target = uri;
+        open();
+    }
+
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public void setTarget(URI tcpBrokerUri) {
+        target = tcpBrokerUri;
+    }
+
+    public void open() throws Exception {
+        serverSocket = createServerSocket(target);
+        serverSocket.setReuseAddress(true);
+        if (receiveBufferSize > 0) {
+            serverSocket.setReceiveBufferSize(receiveBufferSize);
+        }
+        if (proxyUrl == null) {
+            serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+            proxyUrl = urlFromSocket(target, serverSocket);
+        } else {
+            serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+        }
+        acceptor = new Acceptor(serverSocket, target);
+        if (pauseAtStart) {
+            acceptor.pause();
+        }
+        new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
+        closed = new CountDownLatch(1);
+    }
+
+    private boolean isSsl(URI target) {
+        return "ssl".equals(target.getScheme());
+    }
+
+    private ServerSocket createServerSocket(URI target) throws Exception {
+        if (isSsl(target)) {
+            return SSLServerSocketFactory.getDefault().createServerSocket();
+        }
+        return new ServerSocket();
+    }
+
+    private Socket createSocket(URI target) throws Exception {
+        if (isSsl(target)) {
+            return SSLSocketFactory.getDefault().createSocket();
+        }
+        return new Socket();
+    }
+
+    public URI getUrl() {
+        return proxyUrl;
+    }
+
+    /*
+     * close all proxy connections and acceptor
+     */
+    public void close() {
+        List<Bridge> connections;
+        synchronized(this.connections) {
+            connections = new ArrayList<Bridge>(this.connections);
+        }
+        LOG.info("close, numConnections=" + connections.size());
+        for (Bridge con : connections) {
+            closeConnection(con);
+        }
+        acceptor.close();
+        closed.countDown();
+    }
+
+    /*
+     * close all proxy receive connections, leaving acceptor
+     * open
+     */
+    public void halfClose() {
+        List<Bridge> connections;
+        synchronized(this.connections) {
+            connections = new ArrayList<Bridge>(this.connections);
+        }
+        LOG.info("halfClose, numConnections=" + connections.size());
+        for (Bridge con : connections) {
+            halfCloseConnection(con);
+        }
+    }
+
+    public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
+        return closed.await(timeoutSeconds, TimeUnit.SECONDS);
+    }
+
+    /*
+     * called after a close to restart the acceptor on the same port
+     */
+    public void reopen() {
+        LOG.info("reopen");
+        try {
+            open();
+        } catch (Exception e) {
+            LOG.debug("exception on reopen url:" + getUrl(), e);
+        }
+    }
+
+    /*
+     * pause accepting new connections and data transfer through existing proxy
+     * connections. All sockets remain open
+     */
+    public void pause() {
+        synchronized(connections) {
+            LOG.info("pause, numConnections=" + connections.size());
+            acceptor.pause();
+            for (Bridge con : connections) {
+                con.pause();
+            }
+        }
+    }
+
+    /*
+     * continue after pause
+     */
+    public void goOn() {
+        synchronized(connections) {
+            LOG.info("goOn, numConnections=" + connections.size());
+            for (Bridge con : connections) {
+                con.goOn();
+            }
+        }
+        acceptor.goOn();
+    }
+
+    private void closeConnection(Bridge c) {
+        try {
+            c.close();
+        } catch (Exception e) {
+            LOG.debug("exception on close of: " + c, e);
+        }
+    }
+
+    private void halfCloseConnection(Bridge c) {
+        try {
+            c.halfClose();
+        } catch (Exception e) {
+            LOG.debug("exception on half close of: " + c, e);
+        }
+    }
+
+    public boolean isPauseAtStart() {
+        return pauseAtStart;
+    }
+
+    public void setPauseAtStart(boolean pauseAtStart) {
+        this.pauseAtStart = pauseAtStart;
+    }
+
+    public int getAcceptBacklog() {
+        return acceptBacklog;
+    }
+
+    public void setAcceptBacklog(int acceptBacklog) {
+        this.acceptBacklog = acceptBacklog;
+    }
+
+    private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
+        int listenPort = serverSocket.getLocalPort();
+
+        return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
+    }
+
+    public class Bridge {
+
+        private Socket receiveSocket;
+        private Socket sendSocket;
+        private Pump requestThread;
+        private Pump responseThread;
+
+        public Bridge(Socket socket, URI target) throws Exception {
+            receiveSocket = socket;
+            sendSocket = createSocket(target);
+            if (receiveBufferSize > 0) {
+                sendSocket.setReceiveBufferSize(receiveBufferSize);
+            }
+            sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort()));
+            linkWithThreads(receiveSocket, sendSocket);
+            LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize());
+        }
+
+        public void goOn() {
+            responseThread.goOn();
+            requestThread.goOn();
+        }
+
+        public void pause() {
+            requestThread.pause();
+            responseThread.pause();
+        }
+
+        public void close() throws Exception {
+            synchronized(connections) {
+                connections.remove(this);
+            }
+            receiveSocket.close();
+            sendSocket.close();
+        }
+
+        public void halfClose() throws Exception {
+            receiveSocket.close();
+        }
+
+        private void linkWithThreads(Socket source, Socket dest) {
+            requestThread = new Pump(source, dest);
+            requestThread.start();
+            responseThread = new Pump(dest, source);
+            responseThread.start();
+        }
+
+        public class Pump extends Thread {
+
+            protected Socket src;
+            private Socket destination;
+            private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+            public Pump(Socket source, Socket dest) {
+                super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort());
+                src = source;
+                destination = dest;
+                pause.set(new CountDownLatch(0));
+            }
+
+            public void pause() {
+                pause.set(new CountDownLatch(1));
+            }
+
+            public void goOn() {
+                pause.get().countDown();
+            }
+
+            public void run() {
+                byte[] buf = new byte[1024];
+                try {
+                    InputStream in = src.getInputStream();
+                    OutputStream out = destination.getOutputStream();
+                    while (true) {
+                        int len = in.read(buf);
+                        if (len == -1) {
+                            LOG.debug("read eof from:" + src);
+                            break;
+                        }
+                        pause.get().await();
+                        out.write(buf, 0, len);
+                    }
+                } catch (Exception e) {
+                    LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
+                    try {
+                        if (!receiveSocket.isClosed()) {
+                            // for halfClose, on read/write failure if we close the
+                            // remote end will see a close at the same time.
+                            close();
+                        }
+                    } catch (Exception ignore) {
+                    }
+                }
+            }
+        }
+    }
+
+    public class Acceptor implements Runnable {
+
+        private ServerSocket socket;
+        private URI target;
+        private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+
+        public Acceptor(ServerSocket serverSocket, URI uri) {
+            socket = serverSocket;
+            target = uri;
+            pause.set(new CountDownLatch(0));
+            try {
+                socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+            } catch (SocketException e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void pause() {
+            pause.set(new CountDownLatch(1));
+        }
+
+        public void goOn() {
+            pause.get().countDown();
+        }
+
+        public void run() {
+            try {
+                while(!socket.isClosed()) {
+                    pause.get().await();
+                    try {
+                        Socket source = socket.accept();
+                        pause.get().await();
+                        if (receiveBufferSize > 0) {
+                            source.setReceiveBufferSize(receiveBufferSize);
+                        }
+                        LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
+                        synchronized(connections) {
+                            connections.add(new Bridge(source, target));
+                        }
+                    } catch (SocketTimeoutException expected) {
+                    }
+                }
+            } catch (Exception e) {
+                LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+            }
+        }
+
+        public void close() {
+            try {
+                socket.close();
+                closed.countDown();
+                goOn();
+            } catch (IOException ignored) {
+            }
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java
new file mode 100644
index 0000000..244db59
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait {
+
+    public static final long MAX_WAIT_MILLIS = 30*1000;
+    public static final long SLEEP_MILLIS = 1000;
+
+    public interface Condition {
+        boolean isSatisified() throws Exception;
+    }
+
+    public static boolean waitFor(Condition condition) throws Exception {
+        return waitFor(condition, MAX_WAIT_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long duration) throws Exception {
+        return waitFor(condition, duration, SLEEP_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long duration, final long sleepMillis) throws Exception {
+
+        final long expiry = System.currentTimeMillis() + duration;
+        boolean conditionSatisified = condition.isSatisified();
+        while (!conditionSatisified && System.currentTimeMillis() < expiry) {
+            TimeUnit.MILLISECONDS.sleep(sleepMillis);
+            conditionSatisified = condition.isSatisified();
+        }
+        return conditionSatisified;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml b/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml
new file mode 100644
index 0000000..4bd5fc7
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/resources/org/apache/activemq/transport/tcp/n-brokers-ssl.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <amq:broker useJmx="false" persistent="false" start="false" brokerName="dummy">
+
+    <amq:sslContext>
+      <amq:sslContext 
+      		keyStore="dummy.keystore" keyStorePassword="password"/>
+    </amq:sslContext>
+    
+    <amq:transportConnectors>
+      <amq:transportConnector uri="ssl://localhost:62616" />
+    </amq:transportConnectors>
+    
+  </amq:broker>
+
+  <amq:broker useJmx="false" persistent="false" start="false" brokerName="activemq.org">
+    <amq:sslContext>
+      <amq:sslContext 
+      		keyStore="server.keystore" keyStorePassword="password"
+       		trustStore="client.keystore" trustStorePassword="password"/>
+    </amq:sslContext>
+    
+    <amq:transportConnectors>
+      <amq:transportConnector uri="ssl://localhost:63616" />
+    </amq:transportConnectors>
+    
+  </amq:broker>
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index d8aa4ac..a3bae65 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -50,10 +50,10 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.ServerSessionFactory;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void browserFinished(ServerConsumer consumer) {
+
+      }
+
+      @Override
       public boolean isWritable(ReadyListener callback) {
          return true;
       }
@@ -502,7 +507,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
        * @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
        */
       @Override
-      public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+      public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
          inCall.countDown();
          try {
             callbackSemaphore.acquire();
@@ -513,7 +518,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
          }
 
          try {
-            return targetCallback.sendMessage(message, consumer, deliveryCount);
+            return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
          }
          finally {
             callbackSemaphore.release();
@@ -525,8 +530,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
        * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
        */
       @Override
-      public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
-         return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount);
+      public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+         return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
       }
 
       /* (non-Javadoc)
@@ -576,7 +581,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                                         String defaultAddress,
                                                         SessionCallback callback,
                                                         OperationContext context,
-                                                        ServerSessionFactory sessionFactory,
                                                         boolean autoCreateQueue) throws Exception {
          return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
index d2e3215..09fd9b7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.junit.After;
 import org.junit.Before;
@@ -44,6 +45,9 @@ public class BasicOpenWireTest extends OpenWireTestBase {
 
    protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
    protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+   protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
+
+
    protected ActiveMQConnection connection;
    protected String topicName = "amqTestTopic1";
    protected String queueName = "amqTestQueue1";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
index a1a5e38..14cfee0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
@@ -26,6 +26,7 @@ import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -118,7 +119,7 @@ public class BasicSecurityTest extends BasicOpenWireTest {
    }
 
    @Test
-   public void testSendnReceiveAuthorization() throws Exception {
+      public void testSendnReceiveAuthorization() throws Exception {
       Connection sendingConn = null;
       Connection receivingConn = null;
 
@@ -152,16 +153,18 @@ public class BasicSecurityTest extends BasicOpenWireTest {
          producer = sendingSession.createProducer(dest);
          producer.send(message);
 
-         MessageConsumer consumer = null;
+         MessageConsumer consumer;
          try {
             consumer = sendingSession.createConsumer(dest);
+            Assert.fail("exception expected");
          }
          catch (JMSSecurityException e) {
+            e.printStackTrace();
             //expected
          }
 
          consumer = receivingSession.createConsumer(dest);
-         TextMessage received = (TextMessage) consumer.receive();
+         TextMessage received = (TextMessage) consumer.receive(5000);
 
          assertNotNull(received);
          assertEquals("Hello World", received.getText());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
index 825b8b5..69d9784 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.junit.Test;
 
 public class OpenWireUtilTest {


Mime
View raw message