activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [09/13] activemq-artemis git commit: Refactor base test classes
Date Wed, 20 May 2015 09:47:14 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/99147d07/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ServiceTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ServiceTestBase.java
index 3299edb..a77298b 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ServiceTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ServiceTestBase.java
@@ -16,32 +16,29 @@
  */
 package org.apache.activemq.artemis.tests.util;
 
-import javax.management.MBeanServer;
-import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
@@ -50,563 +47,1549 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
 import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.invm.InVMRegistry;
 import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import javax.naming.Context;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+import java.beans.BeanInfo;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Base class with basic utilities on starting up a basic server
  */
-public abstract class ServiceTestBase extends UnitTestCase
+public abstract class ServiceTestBase extends Assert
 {
+   public static final String TARGET_TMP = "./target/tmp";
+   public static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
+   public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
+   public static final String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName();
+   public static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
+   public static final String CLUSTER_PASSWORD = "UnitTestsClusterPassword";
 
    /**
     * Add a "sendCallNumber" property to messages sent using helper classes. Meant to help in
     * debugging.
     */
    private static final String SEND_CALL_NUMBER = "sendCallNumber";
+   private static final String OS_TYPE = System.getProperty("os.name").toLowerCase();
+   private static final int DEFAULT_UDP_PORT;
+   private static final ActiveMQServerLogger log = ActiveMQServerLogger.LOGGER;
+
    protected static final long WAIT_TIMEOUT = 20000;
+
+   // There is a verification about thread leakages. We only fail a single thread when this happens
+   private static Set<Thread> alreadyFailedThread = new HashSet<Thread>();
+
+   private final Collection<ActiveMQServer> servers = new ArrayList<ActiveMQServer>();
+   private final Collection<ServerLocator> locators = new ArrayList<ServerLocator>();
+   private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<ClientSessionFactory>();
+   private final Collection<ClientSession> clientSessions = new HashSet<ClientSession>();
+   private final Collection<ClientConsumer> clientConsumers = new HashSet<ClientConsumer>();
+   private final Collection<ClientProducer> clientProducers = new HashSet<ClientProducer>();
+   private final Collection<ActiveMQComponent> otherComponents = new HashSet<ActiveMQComponent>();
+   private final Set<ExecutorService> executorSet = new HashSet<ExecutorService>();
+
+   private boolean checkThread = true;
+   private String testDir;
    private int sendMsgCount = 0;
+   private Map<Thread, StackTraceElement[]> previousThreads;
 
-   @Override
-   @After
-   public void tearDown() throws Exception
-   {
-      super.tearDown();
-      if (InVMRegistry.instance.size() > 0)
-      {
-         fail("InVMREgistry size > 0");
-      }
-   }
+   @Rule
+   public TestName name = new TestName();
 
-   @Override
-   @Before
-   public void setUp() throws Exception
-   {
-      sendMsgCount = 0;
-      super.setUp();
-   }
+   @Rule
+   public TemporaryFolder temporaryFolder;
 
-   /**
-    * @param queue
-    * @throws InterruptedException
-    */
-   protected void waitForNotPaging(Queue queue) throws InterruptedException
-   {
-      waitForNotPaging(queue.getPageSubscription().getPagingStore());
-   }
+   @Rule
+   // This Custom rule will remove any files under ./target/tmp
+   // including anything created previously by TemporaryFolder
+   public RemoveFolder folder = new RemoveFolder(TARGET_TMP);
 
-   protected void waitForNotPaging(PagingStore store) throws InterruptedException
+   @Rule
+   public TestRule watcher = new TestWatcher()
    {
-      long timeout = System.currentTimeMillis() + 10000;
-      while (timeout > System.currentTimeMillis() && store.isPaging())
+      @Override
+      protected void starting(Description description)
       {
-         Thread.sleep(100);
+         log.info(String.format("#*#*# Starting test: %s()...", description.getMethodName()));
       }
-      assertFalse(store.isPaging());
-   }
 
-   protected Topology waitForTopology(final ActiveMQServer server, final int nodes) throws Exception
+      @Override
+      protected void finished(Description description)
+      {
+         log.info(String.format("#*#*# Finished test: %s()...", description.getMethodName()));
+      }
+   };
+
+   static
    {
-      return waitForTopology(server, nodes, -1, WAIT_TIMEOUT);
+      Random random = new Random();
+      DEFAULT_UDP_PORT = 6000 + random.nextInt(1000);
    }
 
-   protected Topology waitForTopology(final ActiveMQServer server, final int nodes, final int backups) throws Exception
+   public ServiceTestBase()
    {
-      return waitForTopology(server, nodes, backups, WAIT_TIMEOUT);
+      File parent = new File(TARGET_TMP);
+      parent.mkdirs();
+      temporaryFolder = new TemporaryFolder(parent);
    }
 
-   protected Topology waitForTopology(final ActiveMQServer server, final int liveNodes, final int backupNodes, final long timeout) throws Exception
+   @After
+   public void tearDown() throws Exception
    {
-      ActiveMQServerLogger.LOGGER.debug("waiting for " + liveNodes + " on the topology for server = " + server);
-
-      long start = System.currentTimeMillis();
-
-      Set<ClusterConnection> ccs = server.getClusterManager().getClusterConnections();
+      for (ExecutorService s : executorSet)
+      {
+         s.shutdown();
+      }
+      closeAllSessionFactories();
+      closeAllServerLocatorsFactories();
 
-      if (ccs.size() != 1)
+      try
       {
-         throw new IllegalStateException("You need a single cluster connection on this version of waitForTopology on ServiceTestBase");
+         assertAllExecutorsFinished();
+         assertAllClientConsumersAreClosed();
+         assertAllClientProducersAreClosed();
+         assertAllClientSessionsAreClosed();
       }
+      finally
+      {
+         synchronized (servers)
+         {
+            for (ActiveMQServer server : servers)
+            {
+               if (server == null)
+               {
+                  continue;
+               }
+               try
+               {
+                  final ClusterManager clusterManager = server.getClusterManager();
+                  if (clusterManager != null)
+                  {
+                     for (ClusterConnection cc : clusterManager.getClusterConnections())
+                     {
+                        stopComponent(cc);
+                     }
+                  }
+               }
+               catch (Exception e)
+               {
+                  // no-op
+               }
+               stopComponentOutputExceptions(server);
+            }
+            servers.clear();
+         }
 
-      Topology topology = server.getClusterManager().getDefaultConnection(null).getTopology();
+         closeAllOtherComponents();
 
-      int liveNodesCount = 0;
+         ArrayList<Exception> exceptions;
+         try
+         {
+            exceptions = checkCsfStopped();
+         }
+         finally
+         {
+            cleanupPools();
+         }
+         //clean up pools before failing
+         if (!exceptions.isEmpty())
+         {
+            for (Exception exception : exceptions)
+            {
+               exception.printStackTrace();
+            }
+            fail("Client Session Factories still trying to reconnect, see above to see where created");
+         }
+         Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
+         for (Thread thread : threadMap.keySet())
+         {
+            StackTraceElement[] stack = threadMap.get(thread);
+            for (StackTraceElement stackTraceElement : stack)
+            {
+               if (stackTraceElement.getMethodName().contains("getConnectionWithRetry") && !alreadyFailedThread.contains(thread))
+               {
+                  alreadyFailedThread.add(thread);
+                  System.out.println(threadDump(this.getName() + " has left threads running. Look at thread " +
+                                                        thread.getName() +
+                                                        " id = " +
+                                                        thread.getId() +
+                                                        " has running locators on test " +
+                                                        this.getName() +
+                                                        " on this following dump"));
+                  fail("test '" + getName() + "' left serverlocator running, this could effect other tests");
+               }
+               else if (stackTraceElement.getMethodName().contains("BroadcastGroupImpl.run") && !alreadyFailedThread.contains(thread))
+               {
+                  alreadyFailedThread.add(thread);
+                  System.out.println(threadDump(this.getName() + " has left threads running. Look at thread " +
+                                                        thread.getName() +
+                                                        " id = " +
+                                                        thread.getId() +
+                                                        " is still broadcasting " +
+                                                        this.getName() +
+                                                        " on this following dump"));
+                  fail("test left broadcastgroupimpl running, this could effect other tests");
+               }
+            }
+         }
 
-      int backupNodesCount = 0;
+         if (checkThread)
+         {
+            StringBuffer buffer = null;
 
+            boolean failed = true;
 
-      do
-      {
-         liveNodesCount = 0;
-         backupNodesCount = 0;
 
-         for (TopologyMemberImpl member : topology.getMembers())
-         {
-            if (member.getLive() != null)
+            long timeout = System.currentTimeMillis() + 60000;
+            while (failed && timeout > System.currentTimeMillis())
             {
-               liveNodesCount++;
+               buffer = new StringBuffer();
+
+               failed = checkThread(buffer);
+
+               if (failed)
+               {
+                  forceGC();
+                  Thread.sleep(500);
+                  log.info("There are still threads running, trying again");
+                  System.out.println(buffer);
+               }
             }
-            if (member.getBackup() != null)
+
+            if (failed)
             {
-               backupNodesCount++;
+               logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" + this.getName() + "\n" +
+                                       buffer);
+               logAndSystemOut("Thread leakage");
+
+               fail("Thread leaked");
             }
+
+         }
+         else
+         {
+            checkThread = true;
          }
 
-         if ((liveNodes == -1 || liveNodes == liveNodesCount) && (backupNodes == -1 || backupNodes == backupNodesCount))
+         if (Thread.currentThread().getContextClassLoader() == null)
          {
-            return topology;
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+            fail("Thread Context ClassLoader was set to null at some point before this test. We will set to this.getClass().getClassLoader(), but you are supposed to fix your tests");
          }
 
-         Thread.sleep(10);
+         checkFilesUsage();
       }
-      while (System.currentTimeMillis() - start < timeout);
-
-      String msg = "Timed out waiting for cluster topology of live=" + liveNodes + ",backup=" + backupNodes +
-         " (received live=" + liveNodesCount + ", backup=" + backupNodesCount +
-         ") topology = " +
-         topology.describe() +
-         ")";
-
-      ActiveMQServerLogger.LOGGER.error(msg);
 
-      throw new Exception(msg);
+      if (InVMRegistry.instance.size() > 0)
+      {
+         fail("InVMREgistry size > 0");
+      }
    }
 
-
-   protected void waitForTopology(final ActiveMQServer server, String clusterConnectionName, final int nodes, final long timeout) throws Exception
+   @Before
+   public void setUp() throws Exception
    {
-      ActiveMQServerLogger.LOGGER.debug("waiting for " + nodes + " on the topology for server = " + server);
+      sendMsgCount = 0;
+      testDir = temporaryFolder.getRoot().getAbsolutePath();
+      clearDataRecreateServerDirs();
+      OperationContextImpl.clearContext();
 
-      long start = System.currentTimeMillis();
+      InVMRegistry.instance.clear();
 
-      ClusterConnection clusterConnection = server.getClusterManager().getClusterConnection(clusterConnectionName);
+      // checkFreePort(TransportConstants.DEFAULT_PORT);
 
+      previousThreads = Thread.getAllStackTraces();
 
-      Topology topology = clusterConnection.getTopology();
+      logAndSystemOut("#test " + getName());
+   }
 
-      do
+   public static void assertEqualsByteArrays(final byte[] expected, final byte[] actual)
+   {
+      for (int i = 0; i < expected.length; i++)
       {
-         if (nodes == topology.getMembers().size())
-         {
-            return;
-         }
+         Assert.assertEquals("byte at index " + i, expected[i], actual[i]);
+      }
+   }
 
-         Thread.sleep(10);
+   public static int countOccurrencesOf(String str, String sub)
+   {
+      if (str == null || sub == null || str.length() == 0 || sub.length() == 0)
+      {
+         return 0;
       }
-      while (System.currentTimeMillis() - start < timeout);
+      int count = 0;
+      int pos = 0;
+      int idx;
+      while ((idx = str.indexOf(sub, pos)) != -1)
+      {
+         ++count;
+         pos = idx + sub.length();
+      }
+      return count;
+   }
 
-      String msg = "Timed out waiting for cluster topology of " + nodes +
-         " (received " +
-         topology.getMembers().size() +
-         ") topology = " +
-         topology +
-         ")";
+   protected void disableCheckThread()
+   {
+      checkThread = false;
+   }
 
-      ActiveMQServerLogger.LOGGER.error(msg);
+   protected String getName()
+   {
+      return name.getMethodName();
+   }
 
-      throw new Exception(msg);
+   protected boolean isWindows()
+   {
+      return (OS_TYPE.indexOf("win") >= 0);
    }
 
-   protected static final void waitForComponent(final ActiveMQComponent component, final long seconds) throws InterruptedException
+   protected Configuration createDefaultConfig() throws Exception
    {
-      long time = System.currentTimeMillis();
-      long toWait = seconds * 1000;
-      while (!component.isStarted())
-      {
-         Thread.sleep(50);
-         if (System.currentTimeMillis() > (time + toWait))
-         {
-            fail("component did not start within timeout of " + seconds);
-         }
-      }
+      return createDefaultConfig(false);
    }
 
-   protected static final Map<String, Object> generateParams(final int node, final boolean netty)
+   protected Configuration createDefaultConfig(final boolean netty) throws Exception
    {
-      Map<String, Object> params = new HashMap<String, Object>();
+      ConfigurationImpl configuration = createBasicConfig(-1)
+              .setJMXManagementEnabled(false)
+              .clearAcceptorConfigurations()
+              .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
 
       if (netty)
       {
-         params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
-                    org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + node);
-      }
-      else
-      {
-         params.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, node);
+         configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
       }
 
-      return params;
+      return configuration;
    }
 
-   protected static final TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
+   protected Configuration createDefaultConfig(final int index,
+                                               final Map<String, Object> params,
+                                               final String... acceptors)
    {
-      if (live)
+      Configuration configuration = createBasicConfig(index).clearAcceptorConfigurations();
+
+      for (String acceptor : acceptors)
       {
-         return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY);
+         TransportConfiguration transportConfig = new TransportConfiguration(acceptor, params);
+         configuration.getAcceptorConfigurations().add(transportConfig);
       }
 
-      Map<String, Object> server1Params = new HashMap<String, Object>();
+      return configuration;
+   }
 
-      server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
-                        org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+   protected static final ClusterConnectionConfiguration basicClusterConnectionConfig(String connectorName, String... connectors)
+   {
+      ArrayList<String> connectors0 = new ArrayList<>();
+      for (String c : connectors)
+      {
+         connectors0.add(c);
+      }
+      ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration()
+              .setName("cluster1")
+              .setAddress("jms")
+              .setConnectorName(connectorName)
+              .setRetryInterval(1000)
+              .setDuplicateDetection(false)
+              .setForwardWhenNoConsumers(true)
+              .setMaxHops(1)
+              .setConfirmationWindowSize(1)
+              .setStaticConnectors(connectors0);
+
+      return clusterConnectionConfiguration;
+   }
 
-      return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+   protected final OrderedExecutorFactory getOrderedExecutor()
+   {
+      final ExecutorService executor = Executors.newCachedThreadPool();
+      executorSet.add(executor);
+      return new OrderedExecutorFactory(executor);
    }
 
-   protected static final TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live)
+   protected ConfigurationImpl createBasicConfig() throws Exception
    {
-      if (live)
+      return createBasicConfig(0);
+   }
+
+   /**
+    * @param serverID
+    * @return
+    * @throws Exception
+    */
+   protected ConfigurationImpl createBasicConfig(final int serverID)
+   {
+      ConfigurationImpl configuration = new ConfigurationImpl()
+              .setSecurityEnabled(false)
+              .setJournalMinFiles(2)
+              .setJournalFileSize(100 * 1024)
+              .setJournalType(getDefaultJournalType())
+              .setJournalDirectory(getJournalDir(serverID, false))
+              .setBindingsDirectory(getBindingsDir(serverID, false))
+              .setPagingDirectory(getPageDir(serverID, false))
+              .setLargeMessagesDirectory(getLargeMessagesDir(serverID, false))
+              .setJournalCompactMinFiles(0)
+              .setJournalCompactPercentage(0)
+              .setClusterPassword(CLUSTER_PASSWORD);
+
+      return configuration;
+   }
+
+   public static final ConfigurationImpl createBasicConfigNoDataFolder()
+   {
+      ConfigurationImpl configuration = new ConfigurationImpl()
+              .setSecurityEnabled(false)
+              .setJournalType(getDefaultJournalType())
+              .setJournalCompactMinFiles(0)
+              .setJournalCompactPercentage(0)
+              .setClusterPassword(CLUSTER_PASSWORD);
+
+      return configuration;
+   }
+
+   protected static String getUDPDiscoveryAddress()
+   {
+      return System.getProperty("TEST-UDP-ADDRESS", "230.1.2.3");
+   }
+
+   protected static String getUDPDiscoveryAddress(final int variant)
+   {
+      String value = getUDPDiscoveryAddress();
+
+      int posPoint = value.lastIndexOf('.');
+
+      int last = Integer.valueOf(value.substring(posPoint + 1));
+
+      return value.substring(0, posPoint + 1) + (last + variant);
+   }
+
+   public static int getUDPDiscoveryPort()
+   {
+      String port = System.getProperty("TEST-UDP-PORT");
+      if (port != null)
       {
-         return new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
+         return Integer.parseInt(port);
       }
+      return DEFAULT_UDP_PORT;
+   }
 
-      Map<String, Object> server1Params = new HashMap<String, Object>();
+   public static int getUDPDiscoveryPort(final int variant)
+   {
+      return getUDPDiscoveryPort() + variant;
+   }
 
-      server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
-                        org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
-      return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+   public static JournalType getDefaultJournalType()
+   {
+      if (AsynchronousFileImpl.isLoaded())
+      {
+         return JournalType.ASYNCIO;
+      }
+      else
+      {
+         return JournalType.NIO;
+      }
    }
 
-   protected static final TransportConfiguration createTransportConfiguration(boolean netty, boolean acceptor,
-                                                                              Map<String, Object> params)
+   public static void forceGC()
    {
-      String className;
-      if (netty)
+      log.info("#test forceGC");
+      WeakReference<Object> dumbReference = new WeakReference<Object>(new Object());
+      // A loop that will wait GC, using the minimal time as possible
+      while (dumbReference.get() != null)
       {
-         if (acceptor)
+         System.gc();
+         try
          {
-            className = NETTY_ACCEPTOR_FACTORY;
+            Thread.sleep(100);
          }
-         else
+         catch (InterruptedException e)
          {
-            className = NETTY_CONNECTOR_FACTORY;
          }
       }
-      else
+      log.info("#test forceGC Done");
+   }
+
+   public static void forceGC(final Reference<?> ref, final long timeout)
+   {
+      long waitUntil = System.currentTimeMillis() + timeout;
+      // A loop that will wait GC, using the minimal time as possible
+      while (ref.get() != null && System.currentTimeMillis() < waitUntil)
       {
-         if (acceptor)
+         ArrayList<String> list = new ArrayList<String>();
+         for (int i = 0; i < 1000; i++)
          {
-            className = INVM_ACCEPTOR_FACTORY;
+            list.add("Some string with garbage with concatenation " + i);
          }
-         else
+         list.clear();
+         list = null;
+         System.gc();
+         try
+         {
+            Thread.sleep(500);
+         }
+         catch (InterruptedException e)
          {
-            className = INVM_CONNECTOR_FACTORY;
          }
       }
-      if (params == null)
-         params = new HashMap<String, Object>();
-      return new TransportConfiguration(className, params);
    }
 
-   private final ActiveMQServerLogger log = ActiveMQServerLogger.LOGGER;
-
-   protected void waitForServer(ActiveMQServer server) throws InterruptedException
+   /**
+    * Verifies whether weak references are released after a few GCs.
+    *
+    * @param references
+    * @throws InterruptedException
+    */
+   public static void checkWeakReferences(final WeakReference<?>... references)
    {
-      if (server == null)
-         return;
-      final long wait = 5000;
-      long timetowait = System.currentTimeMillis() + wait;
-      while (!server.isStarted() && System.currentTimeMillis() < timetowait)
-      {
-         Thread.sleep(50);
-      }
+      int i = 0;
+      boolean hasValue = false;
 
-      if (!server.isStarted())
+      do
       {
-         log.info(threadDump("Server didn't start"));
-         fail("server didn't start: " + server);
+         hasValue = false;
+
+         if (i > 0)
+         {
+            forceGC();
+         }
+
+         for (WeakReference<?> ref : references)
+         {
+            if (ref.get() != null)
+            {
+               hasValue = true;
+               break;
+            }
+         }
       }
+      while (i++ <= 30 && hasValue);
 
-      if (!server.getHAPolicy().isBackup())
+      for (WeakReference<?> ref : references)
       {
-         if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS))
-            fail("Server didn't initialize: " + server);
+         Assert.assertNull(ref.get());
       }
    }
 
-   protected void waitForServerToStop(ActiveMQServer server) throws InterruptedException
+   public static String threadDump(final String msg)
    {
-      if (server == null)
-         return;
-      final long wait = 5000;
-      long timetowait = System.currentTimeMillis() + wait;
-      while (server.isStarted() && System.currentTimeMillis() < timetowait)
-      {
-         Thread.sleep(50);
+      StringWriter str = new StringWriter();
+      PrintWriter out = new PrintWriter(str);
+
+      Map<Thread, StackTraceElement[]> stackTrace = Thread.getAllStackTraces();
+
+      out.println("*******************************************************************************");
+      out.println("Complete Thread dump " + msg);
+
+      for (Map.Entry<Thread, StackTraceElement[]> el : stackTrace.entrySet())
+      {
+         out.println("===============================================================================");
+         out.println("Thread " + el.getKey() +
+                             " name = " +
+                             el.getKey().getName() +
+                             " id = " +
+                             el.getKey().getId() +
+                             " group = " +
+                             el.getKey().getThreadGroup());
+         out.println();
+         for (StackTraceElement traceEl : el.getValue())
+         {
+            out.println(traceEl);
+         }
       }
 
-      if (server.isStarted())
-      {
-         log.info(threadDump("Server didn't start"));
-         fail("server didnt start: " + server);
-      }
+      out.println("===============================================================================");
+      out.println("End Thread dump " + msg);
+      out.println("*******************************************************************************");
+
+      return str.toString();
    }
 
    /**
-    * @param backup
+    * Sends the message to both logger and System.out (for unit report)
     */
-   public static final void waitForRemoteBackupSynchronization(final ActiveMQServer backup)
+   public void logAndSystemOut(String message, Exception e)
    {
-      waitForRemoteBackup(null, 10, true, backup);
+      ActiveMQServerLogger log0 = ActiveMQServerLogger.LOGGER;
+      log0.info(message, e);
+      System.out.println(message);
+      e.printStackTrace(System.out);
    }
 
    /**
-    * @param sessionFactoryP
-    * @param seconds
-    * @param waitForSync
-    * @param backup
+    * Sends the message to both logger and System.out (for unit report)
     */
-   public static final void waitForRemoteBackup(ClientSessionFactory sessionFactoryP, int seconds,
-                                                boolean waitForSync, final ActiveMQServer backup)
+   public void logAndSystemOut(String message)
    {
-      ClientSessionFactoryInternal sessionFactory = (ClientSessionFactoryInternal)sessionFactoryP;
-      final ActiveMQServerImpl actualServer = (ActiveMQServerImpl) backup;
-      final long toWait = seconds * 1000;
-      final long time = System.currentTimeMillis();
-      int loop = 0;
-      while (true)
+      ActiveMQServerLogger log0 = ActiveMQServerLogger.LOGGER;
+      log0.info(message);
+      System.out.println(this.getClass().getName() + "::" + message);
+   }
+
+   public static String dumpBytes(final byte[] bytes)
+   {
+      StringBuffer buff = new StringBuffer();
+
+      buff.append(System.identityHashCode(bytes) + ", size: " + bytes.length + " [");
+
+      for (int i = 0; i < bytes.length; i++)
       {
-         Activation activation = actualServer.getActivation();
-         boolean isReplicated = !backup.getHAPolicy().isSharedStore();
-         boolean isRemoteUpToDate = true;
-         if (isReplicated)
-         {
-            if (activation instanceof SharedNothingBackupActivation)
-            {
-               isRemoteUpToDate = ((SharedNothingBackupActivation) activation).isRemoteBackupUpToDate();
-            }
-            else
-            {
-               //we may have already failed over and changed the Activation
-               if (actualServer.isStarted())
-               {
-                  //let it fail a few time to have time to start stopping in the case of waiting to failback
-                  isRemoteUpToDate = loop++ > 10;
-               }
-               //we could be waiting to failback or restart if the server is stopping
-               else
-               {
-                  isRemoteUpToDate = false;
-               }
-            }
-         }
-         if ((sessionFactory == null || sessionFactory.getBackupConnector() != null) &&
-               (isRemoteUpToDate || !waitForSync) &&
-            (!waitForSync || actualServer.getBackupManager() != null && actualServer.getBackupManager().isBackupAnnounced()))
-         {
-            break;
-         }
-         if (System.currentTimeMillis() > (time + toWait))
-         {
-            fail("backup started? (" + actualServer.isStarted() + "). Finished synchronizing (" +
-                  (activation) + "). SessionFactory!=null ? " + (sessionFactory != null) +
-                    " || sessionFactory.getBackupConnector()==" +
-                    (sessionFactory != null ? sessionFactory.getBackupConnector() : "not-applicable"));
-         }
-         try
-         {
-            Thread.sleep(100);
-         }
-         catch (InterruptedException e)
+         buff.append(bytes[i]);
+
+         if (i != bytes.length - 1)
          {
-            fail(e.getMessage());
+            buff.append(", ");
          }
       }
+
+      buff.append("]");
+
+      return buff.toString();
    }
 
-   public static final void waitForRemoteBackup(ClientSessionFactory sessionFactory, int seconds)
+   public static String dumbBytesHex(final byte[] buffer, final int bytesPerLine)
    {
-      ClientSessionFactoryInternal factoryInternal = (ClientSessionFactoryInternal) sessionFactory;
-      final long toWait = seconds * 1000;
-      final long time = System.currentTimeMillis();
-      while (true)
+
+      StringBuffer buff = new StringBuffer();
+
+      buff.append("[");
+
+      for (int i = 0; i < buffer.length; i++)
       {
-         if (factoryInternal.getBackupConnector() != null)
-         {
-            break;
-         }
-         if (System.currentTimeMillis() > (time + toWait))
-         {
-            fail("Backup wasn't located");
-         }
-         try
+         buff.append(String.format("%1$2X", buffer[i]));
+         if (i + 1 < buffer.length)
          {
-            Thread.sleep(100);
+            buff.append(", ");
          }
-         catch (InterruptedException e)
+         if ((i + 1) % bytesPerLine == 0)
          {
-            fail(e.getMessage());
+            buff.append("\n ");
          }
       }
+      buff.append("]");
+
+      return buff.toString();
    }
 
-   protected final ActiveMQServer
-   createServer(final boolean realFiles,
-                final Configuration configuration,
-                final int pageSize,
-                final int maxAddressSize,
-                final Map<String, AddressSettings> settings,
-                final MBeanServer mbeanServer)
+   public static void assertEqualsTransportConfigurations(final TransportConfiguration[] expected,
+                                                          final TransportConfiguration[] actual)
    {
-      ActiveMQServer server;
-
-      if (realFiles)
-      {
-         server = ActiveMQServers.newActiveMQServer(configuration, mbeanServer, true);
-      }
-      else
+      assertEquals(expected.length, actual.length);
+      for (int i = 0; i < expected.length; i++)
       {
-         server = ActiveMQServers.newActiveMQServer(configuration, mbeanServer, false);
+         Assert.assertEquals("TransportConfiguration at index " + i, expected[i], actual[i]);
       }
-      try
-      {
-         for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
-         {
-            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
-         }
-
-         AddressSettings defaultSetting = new AddressSettings();
-         defaultSetting.setPageSizeBytes(pageSize);
-         defaultSetting.setMaxSizeBytes(maxAddressSize);
+   }
 
-         server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+   public static void assertEqualsBuffers(final int size, final ActiveMQBuffer expected, final ActiveMQBuffer actual)
+   {
+      // assertEquals(expected.length, actual.length);
+      expected.readerIndex(0);
+      actual.readerIndex(0);
 
-         return server;
-      }
-      finally
+      for (int i = 0; i < size; i++)
       {
-         addServer(server);
+         byte b1 = expected.readByte();
+         byte b2 = actual.readByte();
+         Assert.assertEquals("byte at index " + i, b1, b2);
       }
+      expected.resetReaderIndex();
+      actual.resetReaderIndex();
    }
 
-   protected final ActiveMQServer createServer(final boolean realFiles,
-                                              final Configuration configuration,
-                                              final int pageSize,
-                                              final int maxAddressSize,
-                                              final Map<String, AddressSettings> settings)
-   {
-      return createServer(realFiles, configuration, pageSize, maxAddressSize, AddressFullMessagePolicy.PAGE, settings);
-   }
-
-   protected final ActiveMQServer createServer(final boolean realFiles,
-                                              final Configuration configuration,
-                                              final int pageSize,
-                                              final int maxAddressSize,
-                                              final AddressFullMessagePolicy fullPolicy,
-                                              final Map<String, AddressSettings> settings)
+   public static void assertEqualsByteArrays(final int length, final byte[] expected, final byte[] actual)
    {
-      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
-      if (settings != null)
+      // we check only for the given length (the arrays might be
+      // larger)
+      Assert.assertTrue(expected.length >= length);
+      Assert.assertTrue(actual.length >= length);
+      for (int i = 0; i < length; i++)
       {
-         for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
-         {
-            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
-         }
+         Assert.assertEquals("byte at index " + i, expected[i], actual[i]);
       }
-
-      AddressSettings defaultSetting = new AddressSettings();
-      defaultSetting.setPageSizeBytes(pageSize);
-      defaultSetting.setMaxSizeBytes(maxAddressSize);
-      defaultSetting.setAddressFullMessagePolicy(fullPolicy);
-
-      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
-      return server;
    }
 
-
-   protected final ActiveMQServer createServer(final boolean realFiles,
-                                              Configuration conf,
-                                              MBeanServer mbeanServer)
+   public static void assertSameXids(final List<Xid> expected, final List<Xid> actual)
    {
-      return createServer(realFiles, conf, mbeanServer, new HashMap<String, AddressSettings>());
+      Assert.assertNotNull(expected);
+      Assert.assertNotNull(actual);
+      Assert.assertEquals(expected.size(), actual.size());
+
+      for (int i = 0; i < expected.size(); i++)
+      {
+         Xid expectedXid = expected.get(i);
+         Xid actualXid = actual.get(i);
+         assertEqualsByteArrays(expectedXid.getBranchQualifier(), actualXid.getBranchQualifier());
+         Assert.assertEquals(expectedXid.getFormatId(), actualXid.getFormatId());
+         assertEqualsByteArrays(expectedXid.getGlobalTransactionId(), actualXid.getGlobalTransactionId());
+      }
    }
 
-   protected final ActiveMQServer
-   createServer(final boolean realFiles,
-                final Configuration configuration,
-                final MBeanServer mbeanServer,
-                final Map<String, AddressSettings> settings)
+   protected static void checkNoBinding(final Context context, final String binding)
    {
-      ActiveMQServer server;
-
-      if (realFiles)
+      try
       {
-         server = ActiveMQServers.newActiveMQServer(configuration, mbeanServer);
+         context.lookup(binding);
+         Assert.fail("there must be no resource to look up for " + binding);
       }
-      else
+      catch (Exception e)
       {
-         server = ActiveMQServers.newActiveMQServer(configuration, mbeanServer, false);
       }
-      try
-      {
-         for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
-         {
-            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
-         }
+   }
 
-         AddressSettings defaultSetting = new AddressSettings();
-         server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+   protected static Object checkBinding(final Context context, final String binding) throws Exception
+   {
+      Object o = context.lookup(binding);
+      Assert.assertNotNull(o);
+      return o;
+   }
 
+   /**
+    * @param connectorConfigs
+    * @return
+    */
+   protected ArrayList<String> registerConnectors(final ActiveMQServer server,
+                                                  final List<TransportConfiguration> connectorConfigs)
+   {
+      // The connectors need to be pre-configured at main config object but this method is taking
+      // TransportConfigurations directly
+      // So this will first register them at the config and then generate a list of objects
+      ArrayList<String> connectors = new ArrayList<String>();
+      for (TransportConfiguration tnsp : connectorConfigs)
+      {
+         String name1 = RandomUtil.randomString();
 
-         return server;
+         server.getConfiguration().getConnectorConfigurations().put(name1, tnsp);
+
+         connectors.add(name1);
       }
-      finally
+      return connectors;
+   }
+
+   protected static final void checkFreePort(final int... ports)
+   {
+      for (int port : ports)
       {
-         addServer(server);
+         ServerSocket ssocket = null;
+         try
+         {
+            ssocket = new ServerSocket(port);
+         }
+         catch (Exception e)
+         {
+            throw new IllegalStateException("port " + port + " is bound", e);
+         }
+         finally
+         {
+            if (ssocket != null)
+            {
+               try
+               {
+                  ssocket.close();
+               }
+               catch (IOException e)
+               {
+               }
+            }
+         }
       }
    }
 
-   protected final ActiveMQServer createServer(final boolean realFiles) throws Exception
+   /**
+    * @return the testDir
+    */
+   protected final String getTestDir()
    {
-      return createServer(realFiles, false);
+      return testDir;
    }
 
-   protected final ActiveMQServer createServer(final boolean realFiles, final boolean netty) throws Exception
+   protected final void setTestDir(String testDir)
    {
-      return createServer(realFiles, createDefaultConfig(netty), -1, -1, new HashMap<String, AddressSettings>());
+      this.testDir = testDir;
    }
 
-   protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration)
+   protected final void clearDataRecreateServerDirs()
    {
-      return createServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>());
+      clearDataRecreateServerDirs(getTestDir());
    }
 
-   protected final ActiveMQServer createServer(final Configuration configuration)
+   protected void clearDataRecreateServerDirs(final String testDir1)
    {
-      return createServer(configuration.isPersistenceEnabled(), configuration, -1, -1,
-                          new HashMap<String, AddressSettings>());
+      // Need to delete the root
+
+      File file = new File(testDir1);
+      deleteDirectory(file);
+      file.mkdirs();
+
+      recreateDirectory(getJournalDir(testDir1));
+      recreateDirectory(getBindingsDir(testDir1));
+      recreateDirectory(getPageDir(testDir1));
+      recreateDirectory(getLargeMessagesDir(testDir1));
+      recreateDirectory(getClientLargeMessagesDir(testDir1));
+      recreateDirectory(getTemporaryDir(testDir1));
    }
 
-   protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
-                                                    final Configuration configuration,
-                                                    final NodeManager nodeManager,
-                                                    final int id)
-   {
+   /**
+    * @return the journalDir
+    */
+   public String getJournalDir()
+   {
+      return getJournalDir(getTestDir());
+   }
+
+   protected static String getJournalDir(final String testDir1)
+   {
+      return testDir1 + "/journal";
+   }
+
+   protected String getJournalDir(final int index, final boolean backup)
+   {
+      return getJournalDir(getTestDir(), index, backup);
+   }
+
+   public static String getJournalDir(final String testDir, final int index, final boolean backup)
+   {
+      return getJournalDir(testDir) + directoryNameSuffix(index, backup);
+   }
+
+   /**
+    * @return the bindingsDir
+    */
+   protected String getBindingsDir()
+   {
+      return getBindingsDir(getTestDir());
+   }
+
+   /**
+    * @return the bindingsDir
+    */
+   protected static String getBindingsDir(final String testDir1)
+   {
+      return testDir1 + "/bindings";
+   }
+
+   /**
+    * @return the bindingsDir
+    */
+   protected String getBindingsDir(final int index, final boolean backup)
+   {
+      return getBindingsDir(getTestDir(), index, backup);
+   }
+
+   public static String getBindingsDir(final String testDir, final int index, final boolean backup)
+   {
+      return getBindingsDir(testDir) + directoryNameSuffix(index, backup);
+   }
+
+   /**
+    * @return the pageDir
+    */
+   protected String getPageDir()
+   {
+      return getPageDir(getTestDir());
+   }
+
+   /**
+    * @return the pageDir
+    */
+   protected static String getPageDir(final String testDir1)
+   {
+      return testDir1 + "/page";
+   }
+
+   protected String getPageDir(final int index, final boolean backup)
+   {
+      return getPageDir(getTestDir(), index, backup);
+   }
+
+   public static String getPageDir(final String testDir, final int index, final boolean backup)
+   {
+      return getPageDir(testDir) + directoryNameSuffix(index, backup);
+   }
+
+   /**
+    * @return the largeMessagesDir
+    */
+   protected String getLargeMessagesDir()
+   {
+      return getLargeMessagesDir(getTestDir());
+   }
+
+   /**
+    * @return the largeMessagesDir
+    */
+   protected static String getLargeMessagesDir(final String testDir1)
+   {
+      return testDir1 + "/large-msg";
+   }
+
+   protected String getLargeMessagesDir(final int index, final boolean backup)
+   {
+      return getLargeMessagesDir(getTestDir(), index, backup);
+   }
+
+   public static String getLargeMessagesDir(final String testDir, final int index, final boolean backup)
+   {
+      return getLargeMessagesDir(testDir) + directoryNameSuffix(index, backup);
+   }
+
+   private static String directoryNameSuffix(int index, boolean backup)
+   {
+      if (index == -1)
+         return "";
+      return index + "-" + (backup ? "B" : "L");
+   }
+
+   /**
+    * @return the clientLargeMessagesDir
+    */
+   protected String getClientLargeMessagesDir()
+   {
+      return getClientLargeMessagesDir(getTestDir());
+   }
+
+   /**
+    * @return the clientLargeMessagesDir
+    */
+   protected String getClientLargeMessagesDir(final String testDir1)
+   {
+      return testDir1 + "/client-large-msg";
+   }
+
+   /**
+    * @return the temporaryDir
+    */
+   protected final String getTemporaryDir()
+   {
+      return getTemporaryDir(getTestDir());
+   }
+
+   /**
+    * @return the temporaryDir
+    */
+   protected String getTemporaryDir(final String testDir1)
+   {
+      return testDir1 + "/temp";
+   }
+
+   protected static void expectActiveMQException(final String message, final ActiveMQExceptionType errorCode, final ActiveMQAction action)
+   {
+      try
+      {
+         action.run();
+         Assert.fail(message);
+      }
+      catch (Exception e)
+      {
+         Assert.assertTrue(e instanceof ActiveMQException);
+         Assert.assertEquals(errorCode, ((ActiveMQException) e).getType());
+      }
+   }
+
+   protected static void expectActiveMQException(final ActiveMQExceptionType errorCode, final ActiveMQAction action)
+   {
+      expectActiveMQException("must throw a ActiveMQException with the expected errorCode: " + errorCode,
+                              errorCode,
+                              action);
+   }
+
+   protected static void expectXAException(final int errorCode, final ActiveMQAction action)
+   {
+      try
+      {
+         action.run();
+         Assert.fail("must throw a XAException with the expected errorCode: " + errorCode);
+      }
+      catch (Exception e)
+      {
+         Assert.assertTrue(e instanceof XAException);
+         Assert.assertEquals(errorCode, ((XAException) e).errorCode);
+      }
+   }
+
+   public static byte getSamplebyte(final long position)
+   {
+      return (byte) ('a' + position % ('z' - 'a' + 1));
+   }
+
+   // Creates a Fake LargeStream without using a real file
+   public static InputStream createFakeLargeStream(final long size) throws Exception
+   {
+      return new InputStream()
+      {
+         private long count;
+
+         private boolean closed = false;
+
+         @Override
+         public void close() throws IOException
+         {
+            super.close();
+            closed = true;
+         }
+
+         @Override
+         public int read() throws IOException
+         {
+            if (closed)
+            {
+               throw new IOException("Stream was closed");
+            }
+            if (count++ < size)
+            {
+               return getSamplebyte(count - 1);
+            }
+            else
+            {
+               return -1;
+            }
+         }
+      };
+
+   }
+
+   /**
+    * It validates a Bean (POJO) using simple setters and getters with random values.
+    * You can pass a list of properties to be ignored, as some properties will have a pre-defined domain (not being possible to use random-values on them)
+    */
+   protected void validateGettersAndSetters(final Object pojo, final String... ignoredProperties) throws Exception
+   {
+      HashSet<String> ignoreSet = new HashSet<String>();
+
+      for (String ignore : ignoredProperties)
+      {
+         ignoreSet.add(ignore);
+      }
+
+      BeanInfo info = Introspector.getBeanInfo(pojo.getClass());
+
+      PropertyDescriptor[] properties = info.getPropertyDescriptors();
+
+      for (PropertyDescriptor prop : properties)
+      {
+         Object value;
+
+         if (prop.getPropertyType() == String.class)
+         {
+            value = RandomUtil.randomString();
+         }
+         else if (prop.getPropertyType() == Integer.class || prop.getPropertyType() == Integer.TYPE)
+         {
+            value = RandomUtil.randomInt();
+         }
+         else if (prop.getPropertyType() == Long.class || prop.getPropertyType() == Long.TYPE)
+         {
+            value = RandomUtil.randomLong();
+         }
+         else if (prop.getPropertyType() == Boolean.class || prop.getPropertyType() == Boolean.TYPE)
+         {
+            value = RandomUtil.randomBoolean();
+         }
+         else if (prop.getPropertyType() == Double.class || prop.getPropertyType() == Double.TYPE)
+         {
+            value = RandomUtil.randomDouble();
+         }
+         else
+         {
+            System.out.println("Can't validate property of type " + prop.getPropertyType() + " on " + prop.getName());
+            value = null;
+         }
+
+         if (value != null && prop.getWriteMethod() != null && prop.getReadMethod() == null)
+         {
+            System.out.println("WriteOnly property " + prop.getName() + " on " + pojo.getClass());
+         }
+         else if (value != null & prop.getWriteMethod() != null &&
+                 prop.getReadMethod() != null &&
+                 !ignoreSet.contains(prop.getName()))
+         {
+            System.out.println("Validating " + prop.getName() + " type = " + prop.getPropertyType());
+            prop.getWriteMethod().invoke(pojo, value);
+
+            Assert.assertEquals("Property " + prop.getName(), value, prop.getReadMethod().invoke(pojo));
+         }
+      }
+   }
+
+   /**
+    * @param queue
+    * @throws InterruptedException
+    */
+   protected void waitForNotPaging(Queue queue) throws InterruptedException
+   {
+      waitForNotPaging(queue.getPageSubscription().getPagingStore());
+   }
+
+   protected void waitForNotPaging(PagingStore store) throws InterruptedException
+   {
+      long timeout = System.currentTimeMillis() + 10000;
+      while (timeout > System.currentTimeMillis() && store.isPaging())
+      {
+         Thread.sleep(100);
+      }
+      assertFalse(store.isPaging());
+   }
+
+   protected Topology waitForTopology(final ActiveMQServer server, final int nodes) throws Exception
+   {
+      return waitForTopology(server, nodes, -1, WAIT_TIMEOUT);
+   }
+
+   protected Topology waitForTopology(final ActiveMQServer server, final int nodes, final int backups) throws Exception
+   {
+      return waitForTopology(server, nodes, backups, WAIT_TIMEOUT);
+   }
+
+   protected Topology waitForTopology(final ActiveMQServer server, final int liveNodes, final int backupNodes, final long timeout) throws Exception
+   {
+      ActiveMQServerLogger.LOGGER.debug("waiting for " + liveNodes + " on the topology for server = " + server);
+
+      long start = System.currentTimeMillis();
+
+      Set<ClusterConnection> ccs = server.getClusterManager().getClusterConnections();
+
+      if (ccs.size() != 1)
+      {
+         throw new IllegalStateException("You need a single cluster connection on this version of waitForTopology on ServiceTestBase");
+      }
+
+      Topology topology = server.getClusterManager().getDefaultConnection(null).getTopology();
+
+      int liveNodesCount = 0;
+
+      int backupNodesCount = 0;
+
+
+      do
+      {
+         liveNodesCount = 0;
+         backupNodesCount = 0;
+
+         for (TopologyMemberImpl member : topology.getMembers())
+         {
+            if (member.getLive() != null)
+            {
+               liveNodesCount++;
+            }
+            if (member.getBackup() != null)
+            {
+               backupNodesCount++;
+            }
+         }
+
+         if ((liveNodes == -1 || liveNodes == liveNodesCount) && (backupNodes == -1 || backupNodes == backupNodesCount))
+         {
+            return topology;
+         }
+
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < timeout);
+
+      String msg = "Timed out waiting for cluster topology of live=" + liveNodes + ",backup=" + backupNodes +
+         " (received live=" + liveNodesCount + ", backup=" + backupNodesCount +
+         ") topology = " +
+         topology.describe() +
+         ")";
+
+      ActiveMQServerLogger.LOGGER.error(msg);
+
+      throw new Exception(msg);
+   }
+
+
+   protected void waitForTopology(final ActiveMQServer server, String clusterConnectionName, final int nodes, final long timeout) throws Exception
+   {
+      ActiveMQServerLogger.LOGGER.debug("waiting for " + nodes + " on the topology for server = " + server);
+
+      long start = System.currentTimeMillis();
+
+      ClusterConnection clusterConnection = server.getClusterManager().getClusterConnection(clusterConnectionName);
+
+
+      Topology topology = clusterConnection.getTopology();
+
+      do
+      {
+         if (nodes == topology.getMembers().size())
+         {
+            return;
+         }
+
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < timeout);
+
+      String msg = "Timed out waiting for cluster topology of " + nodes +
+         " (received " +
+         topology.getMembers().size() +
+         ") topology = " +
+         topology +
+         ")";
+
+      ActiveMQServerLogger.LOGGER.error(msg);
+
+      throw new Exception(msg);
+   }
+
+   protected static final void waitForComponent(final ActiveMQComponent component, final long seconds) throws InterruptedException
+   {
+      long time = System.currentTimeMillis();
+      long toWait = seconds * 1000;
+      while (!component.isStarted())
+      {
+         Thread.sleep(50);
+         if (System.currentTimeMillis() > (time + toWait))
+         {
+            fail("component did not start within timeout of " + seconds);
+         }
+      }
+   }
+
+   protected static final Map<String, Object> generateParams(final int node, final boolean netty)
+   {
+      Map<String, Object> params = new HashMap<String, Object>();
+
+      if (netty)
+      {
+         params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+                    org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + node);
+      }
+      else
+      {
+         params.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, node);
+      }
+
+      return params;
+   }
+
+   protected static final TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
+   {
+      if (live)
+      {
+         return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY);
+      }
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+
+      server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+                        org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+
+      return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+   }
+
+   protected static final TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live)
+   {
+      if (live)
+      {
+         return new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
+      }
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+
+      server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+                        org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+      return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+   }
+
+   protected static final TransportConfiguration createTransportConfiguration(boolean netty, boolean acceptor,
+                                                                              Map<String, Object> params)
+   {
+      String className;
+      if (netty)
+      {
+         if (acceptor)
+         {
+            className = NETTY_ACCEPTOR_FACTORY;
+         }
+         else
+         {
+            className = NETTY_CONNECTOR_FACTORY;
+         }
+      }
+      else
+      {
+         if (acceptor)
+         {
+            className = INVM_ACCEPTOR_FACTORY;
+         }
+         else
+         {
+            className = INVM_CONNECTOR_FACTORY;
+         }
+      }
+      if (params == null)
+         params = new HashMap<String, Object>();
+      return new TransportConfiguration(className, params);
+   }
+
+
+   protected void waitForServer(ActiveMQServer server) throws InterruptedException
+   {
+      if (server == null)
+         return;
+      final long wait = 5000;
+      long timetowait = System.currentTimeMillis() + wait;
+      while (!server.isStarted() && System.currentTimeMillis() < timetowait)
+      {
+         Thread.sleep(50);
+      }
+
+      if (!server.isStarted())
+      {
+         log.info(threadDump("Server didn't start"));
+         fail("server didn't start: " + server);
+      }
+
+      if (!server.getHAPolicy().isBackup())
+      {
+         if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS))
+            fail("Server didn't initialize: " + server);
+      }
+   }
+
+   protected void waitForServerToStop(ActiveMQServer server) throws InterruptedException
+   {
+      if (server == null)
+         return;
+      final long wait = 5000;
+      long timetowait = System.currentTimeMillis() + wait;
+      while (server.isStarted() && System.currentTimeMillis() < timetowait)
+      {
+         Thread.sleep(50);
+      }
+
+      if (server.isStarted())
+      {
+         log.info(threadDump("Server didn't start"));
+         fail("server didnt start: " + server);
+      }
+   }
+
+   /**
+    * @param backup
+    */
+   public static final void waitForRemoteBackupSynchronization(final ActiveMQServer backup)
+   {
+      waitForRemoteBackup(null, 10, true, backup);
+   }
+
+   /**
+    * @param sessionFactoryP
+    * @param seconds
+    * @param waitForSync
+    * @param backup
+    */
+   public static final void waitForRemoteBackup(ClientSessionFactory sessionFactoryP, int seconds,
+                                                boolean waitForSync, final ActiveMQServer backup)
+   {
+      ClientSessionFactoryInternal sessionFactory = (ClientSessionFactoryInternal)sessionFactoryP;
+      final ActiveMQServerImpl actualServer = (ActiveMQServerImpl) backup;
+      final long toWait = seconds * 1000;
+      final long time = System.currentTimeMillis();
+      int loop = 0;
+      while (true)
+      {
+         Activation activation = actualServer.getActivation();
+         boolean isReplicated = !backup.getHAPolicy().isSharedStore();
+         boolean isRemoteUpToDate = true;
+         if (isReplicated)
+         {
+            if (activation instanceof SharedNothingBackupActivation)
+            {
+               isRemoteUpToDate = ((SharedNothingBackupActivation) activation).isRemoteBackupUpToDate();
+            }
+            else
+            {
+               //we may have already failed over and changed the Activation
+               if (actualServer.isStarted())
+               {
+                  //let it fail a few time to have time to start stopping in the case of waiting to failback
+                  isRemoteUpToDate = loop++ > 10;
+               }
+               //we could be waiting to failback or restart if the server is stopping
+               else
+               {
+                  isRemoteUpToDate = false;
+               }
+            }
+         }
+         if ((sessionFactory == null || sessionFactory.getBackupConnector() != null) &&
+               (isRemoteUpToDate || !waitForSync) &&
+            (!waitForSync || actualServer.getBackupManager() != null && actualServer.getBackupManager().isBackupAnnounced()))
+         {
+            break;
+         }
+         if (System.currentTimeMillis() > (time + toWait))
+         {
+            fail("backup started? (" + actualServer.isStarted() + "). Finished synchronizing (" +
+                  (activation) + "). SessionFactory!=null ? " + (sessionFactory != null) +
+                    " || sessionFactory.getBackupConnector()==" +
+                    (sessionFactory != null ? sessionFactory.getBackupConnector() : "not-applicable"));
+         }
+         try
+         {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException e)
+         {
+            fail(e.getMessage());
+         }
+      }
+   }
+
+   public static final void waitForRemoteBackup(ClientSessionFactory sessionFactory, int seconds)
+   {
+      ClientSessionFactoryInternal factoryInternal = (ClientSessionFactoryInternal) sessionFactory;
+      final long toWait = seconds * 1000;
+      final long time = System.currentTimeMillis();
+      while (true)
+      {
+         if (factoryInternal.getBackupConnector() != null)
+         {
+            break;
+         }
+         if (System.currentTimeMillis() > (time + toWait))
+         {
+            fail("Backup wasn't located");
+         }
+         try
+         {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException e)
+         {
+            fail(e.getMessage());
+         }
+      }
+   }
+
+   protected final ActiveMQServer createServer(final boolean realFiles,
+                                              final Configuration configuration,
+                                              final long pageSize,
+                                              final long maxAddressSize,
+                                              final Map<String, AddressSettings> settings)
+   {
+
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
+      if (settings != null)
+      {
+         for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+         {
+            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         }
+      }
+
+      AddressSettings defaultSetting = new AddressSettings();
+      defaultSetting.setPageSizeBytes(pageSize);
+      defaultSetting.setMaxSizeBytes(maxAddressSize);
+      defaultSetting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      return server;
+   }
+
+   protected final ActiveMQServer createServer(final boolean realFiles) throws Exception
+   {
+      return createServer(realFiles, false);
+   }
+
+   protected final ActiveMQServer createServer(final boolean realFiles, final boolean netty) throws Exception
+   {
+      return createServer(realFiles, createDefaultConfig(netty), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
+   }
+
+   protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration)
+   {
+      return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
+   }
+
+   protected final ActiveMQServer createServer(final Configuration configuration)
+   {
+      return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
+   }
+
+   protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
+                                                    final Configuration configuration,
+                                                    final NodeManager nodeManager,
+                                                    final int id)
+   {
       return createInVMFailoverServer(realFiles,
                                       configuration,
                                       -1,
@@ -616,648 +1599,1401 @@ public abstract class ServiceTestBase extends UnitTestCase
                                       id);
    }
 
-   protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
-                                                    final Configuration configuration,
-                                                    final int pageSize,
-                                                    final int maxAddressSize,
-                                                    final Map<String, AddressSettings> settings,
-                                                    NodeManager nodeManager,
-                                                    final int id)
+   protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
+                                                    final Configuration configuration,
+                                                    final int pageSize,
+                                                    final int maxAddressSize,
+                                                    final Map<String, AddressSettings> settings,
+                                                    NodeManager nodeManager,
+                                                    final int id)
+   {
+      ActiveMQServer server;
+      ActiveMQSecurityManager securityManager = new ActiveMQSecurityManagerImpl();
+      configuration.setPersistenceEnabled(realFiles);
+      server = new InVMNodeManagerServer(configuration,
+                                         ManagementFactory.getPlatformMBeanServer(),
+                                         securityManager,
+                                         nodeManager);
+
+      try
+      {
+         server.setIdentity("Server " + id);
+
+         for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+         {
+            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         }
+
+         AddressSettings defaultSetting = new AddressSettings();
+         defaultSetting.setPageSizeBytes(pageSize);
+         defaultSetting.setMaxSizeBytes(maxAddressSize);
+
+         server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+         return server;
+      }
+      finally
+      {
+         addServer(server);
+      }
+   }
+
+   protected ActiveMQServer createColocatedInVMFailoverServer(final boolean realFiles,
+                                                    final Configuration configuration,
+                                                    NodeManager liveNodeManager,
+                                                    NodeManager backupNodeManager,
+                                                    final int id)
+   {
+      return createColocatedInVMFailoverServer(realFiles,
+            configuration,
+            -1,
+            -1,
+            new HashMap<String, AddressSettings>(),
+            liveNodeManager,
+            backupNodeManager,
+            id);
+   }
+
+   protected ActiveMQServer createColocatedInVMFailoverServer(final boolean realFiles,
+                                                    final Configuration configuration,
+                                                    final int pageSize,
+                                                    final int maxAddressSize,
+                                                    final Map<String, AddressSettings> settings,
+                                                    NodeManager liveNodeManager,
+                                                    NodeManager backupNodeManager,
+                                                    final int id)
+   {
+      ActiveMQServer server;
+      ActiveMQSecurityManager securityManager = new ActiveMQSecurityManagerImpl();
+      configuration.setPersistenceEnabled(realFiles);
+      server = new ColocatedActiveMQServer(configuration,
+            ManagementFactory.getPlatformMBeanServer(),
+            securityManager,
+            liveNodeManager,
+            backupNodeManager);
+
+      try
+      {
+         server.setIdentity("Server " + id);
+
+         for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+         {
+            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         }
+
+         AddressSettings defaultSetting = new AddressSettings();
+         defaultSetting.setPageSizeBytes(pageSize);
+         defaultSetting.setMaxSizeBytes(maxAddressSize);
+
+         server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+         return server;
+      }
+      finally
+      {
+         addServer(server);
+      }
+   }
+
+   protected ActiveMQServer createClusteredServerWithParams(final boolean isNetty,
+                                                           final int index,
+                                                           final boolean realFiles,
+                                                           final Map<String, Object> params) throws Exception
+   {
+      String acceptor = isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY;
+      return createServer(realFiles, createDefaultConfig(index, params, acceptor), -1, -1,
+                          new HashMap<String, AddressSettings>());
+   }
+
+   protected ActiveMQServer createClusteredServerWithParams(final boolean isNetty,
+                                                           final int index,
+                                                           final boolean realFiles,
+                                                           final int pageSize,
+                                                           final int maxAddressSize,
+                                                           final Map<String, Object> params) throws Exception
+   {
+      return createServer(realFiles, createDefaultConfig(index, params, (isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY)),
+                          pageSize,
+                          maxAddressSize,
+                          new HashMap<String, AddressSettings>());
+   }
+
+   protected ServerLocator createFactory(final boolean isNetty) throws Exception
+   {
+      if (isNetty)
+      {
+         return createNettyNonHALocator();
+      }
+      else
+      {
+         return createInVMNonHALocator();
+      }
+   }
+
+   protected void createQueue(final String address, final String queue) throws Exception
+   {
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sf = locator.createSessionFactory();
+      ClientSession session = sf.createSession();
+      try
+      {
+         session.createQueue(address, queue);
+      }
+      finally
+      {
+         session.close();
+         closeSessionFactory(sf);
+         closeServerLocator(locator);
+      }
+   }
+
+   protected final ServerLocator createInVMLocator(final int serverID)
+   {
+      TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID, UUIDGenerator.getInstance().generateStringUUID());
+
+      ServerLocator locator = ActiveMQClient.createServerLocatorWithHA(tnspConfig);
+      return addServerLocator(locator);
+   }
+
+   /**
+    * @param serverID
+    * @return
+    */
+   protected final TransportConfiguration createInVMTransportConnectorConfig(final int serverID, String name1)
+   {
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+
+      if (serverID != 0)
+      {
+         server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+      }
+
+      TransportConfiguration tnspConfig = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params, name1);
+      return tnspConfig;
+   }
+
+   public String getTextMessage(final ClientMessage m)
+   {
+      m.getBodyBuffer().resetReaderIndex();
+      return m.getBodyBuffer().readString();
+   }
+
+   protected ClientMessage createBytesMessage(final ClientSession session,
+                                              final byte type,
+                                              final byte[] b,
+                                              final boolean durable)
+   {
+      ClientMessage message = session.createMessage(type, durable, 0, System.currentTimeMillis(), (byte) 1);
+      message.getBodyBuffer().writeBytes(b);
+      return message;
+   }
+
+   /**
+    * @param i
+    * @param message
+    * @throws Exception
+    */
+   protected void setBody(final int i, final ClientMessage message)
+   {
+      message.getBodyBuffer().writeString("message" + i);
+   }
+
+   /**
+    * @param i
+    * @param message
+    */
+   protected void assertMessageBody(final int i, final ClientMessage message)
+   {
+      Assert.assertEquals(message.toString(), "message" + i, message.getBodyBuffer().readString());
+   }
+
+   /**
+    * Send durable messages with pre-specified body.
+    *
+    * @param session
+    * @param producer
+    * @param numMessages
+    * @throws Exception
+    */
+   public final void sendMessages(ClientSession session, ClientProducer producer, int numMessages) throws ActiveMQException
+   {
+      for (int i = 0; i < numMessages; i++)
+      {
+         producer.send(createMessage(session, i, true));
+      }
+   }
+
+   protected final ClientMessage createMessage(ClientSession session, int counter, boolean durable) throws ActiveMQException
+   {
+      ClientMessage message = session.createMessage(durable);
+      setBody(counter, message);
+      message.putIntProperty("counter", counter);
+      message.putIntProperty(SEND_CALL_NUMBER, sendMsgCount++);
+      return message;
+   }
+
+   protected final void receiveMessages(ClientConsumer consumer, final int start, final int msgCount, final boolean ack) throws ActiveMQException
+   {
+      for (int i = start; i < msgCount; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+         Assert.assertNotNull("Expecting a message " + i, message);
+         // sendCallNumber is just a debugging measure.
+         Object prop = message.getObjectProperty(SEND_CALL_NUMBER);
+         if (prop == null)
+            prop = Integer.valueOf(-1);
+         final int actual = message.getIntProperty("counter").intValue();
+         Assert.assertEquals("expected=" + i + ". Got: property['counter']=" + actual + " sendNumber=" + prop, i,
+                             actual);
+         assertMessageBody(i, message);
+         if (ack)
+            message.acknowledge();
+      }
+   }
+
+   /**
+    * Reads a journal system and returns a Map<Integer,AtomicInteger> of recordTypes and the number of records per type,
+    * independent of being deleted or not
+    *
+    * @param config
+    * @return
+    * @throws Exception
+    */
+   protected Pair<List<RecordInfo>, List<PreparedTransactionInfo>> loadMessageJournal(Configuration config) throws Exception
+   {
+      JournalImpl messagesJournal = null;
+      try
+      {
+         SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getJournalDir(), null);
+
+         messagesJournal = new JournalImpl(config.getJournalFileSize(),
+                                           config.getJournalMinFiles(),
+                                           0,
+                                           0,
+                                           messagesFF,
+                                           "activemq-data",
+                                           "amq",
+                                           1);
+         final List<RecordInfo> committedRecords = new LinkedList<RecordInfo>();
+         final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+         messagesJournal.start();
+
+         messagesJournal.load(committedRecords, preparedTransactions, null, false);
+
+         return new Pair<List<RecordInfo>, List<PreparedTransactionInfo>>(committedRecords, preparedTransactions);
+      }
+      finally
+      {
+         try
+         {
+            if (messagesJournal != null)
+            {
+               messagesJournal.stop();
+            }
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+   /**
+    * Reads a journal system and returns a Map<Integer,AtomicInteger> of recordTypes and the number of records per type,
+    * independent of being deleted or not
+    *
+    * @param config
+    * @return
+    * @throws Exception
+    */
+   protected HashMap<Integer, AtomicInteger> countJournal(Configuration config) throws Exception
+   {
+      final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalDirectory(), null);
+
+      JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(),
+                                                    config.getJournalMinFiles(),
+                                                    0,
+                                                    0,
+                                                    messagesFF,
+                                                    "activemq-data",
+                                                    "amq",
+                                                    1);
+      List<JournalFile> filesToRead = messagesJournal.orderFiles();
+
+      for (JournalFile file : filesToRead)
+      {
+         JournalImpl.readJournalFile(messagesFF, file, new RecordTypeCounter(recordsType));
+      }
+      return recordsType;
+   }
+
+   /**
+    * This method will load a journal and count the living records
+    *
+    * @param config
+    * @return
+    * @throws Exception
+    */
+   protected HashMap<Integer, AtomicInteger> countJournalLivingRecords(Configuration config) throws Exception
+   {
+      return internalCountJournalLivingRecords(config, true);
+   }
+
+   /**
+    * This method will load a journal and count the living records
+    *
+    * @param config
+    * @param messageJournal if true -> MessageJournal, false -> BindingsJournal
+    * @return
+    * @throws Exception
+    */
+   protected HashMap<Integer, AtomicInteger> internalCountJournalLivingRecords(Configuration config, boolean messageJournal) throws Exception
+   {
+      final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
+      SequentialFileFactory ff;
+
+      JournalImpl journal;
+
+      if (messageJournal)
+      {
+         ff = new NIOSequentialFileFactory(getJournalDir(), null);
+         journal = new JournalImpl(config.getJournalFileSize(),
+                                   config.getJournalMinFiles(),
+                                   0,
+                                   0,
+                                   ff,
+                                   "activemq-data",
+                                   "amq",
+                                   1);
+      }
+      else
+      {
+         ff = new NIOSequentialFileFactory(getBindingsDir(), null);
+         journal = new JournalImpl(1024 * 1024,
+                                   2,
+                                   config.getJournalCompactMinFiles(),
+                                   config.getJournalCompactPercentage(),
+                                   ff,
+                                   "activemq-bindings",
+                                   "bindings",
+                                   1);
+      }
+      journal.start();
+
+
+      final List<RecordInfo> committedRecords = new LinkedList<RecordInfo>();
+      final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+
+      journal.load(committedRecords, preparedTransactions, null, false);
+
+      for (RecordInfo info : committedRecords)
+      {
+         Integer ikey = new Integer(info.getUserRecordType());
+         AtomicInteger value = recordsType.get(ikey);
+         if (value == null)
+         {
+            value = new AtomicInteger();
+            recordsType.put(ikey, value);
+         }
+         value.incrementAndGet();
+
+      }
+
+      journal.stop();
+      return recordsType;
+   }
+
+   private static final class RecordTypeCounter implements JournalReaderCallback
+   {
+      private final HashMap<Integer, AtomicInteger> recordsType;
+
+      /**
+       * @param recordsType
+       */
+      public RecordTypeCounter(HashMap<Integer, AtomicInteger> recordsType)
+      {
+         this.recordsType = recordsType;
+      }
+
+      AtomicInteger getType(byte key)
+      {
+         if (key == 0)
+         {
+            System.out.println("huh?");
+         }
+         Integer ikey = new Integer(key);
+         AtomicInteger value = recordsType.get(ikey);
+         if (value == null)
+         {
+            value = new AtomicInteger();
+            recordsType.put(ikey, value);
+         }
+         return value;
+      }
+
+      public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+      {
+         getType(recordInfo.getUserRecordType()).incrementAndGet();
+      }
+
+      public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+      {
+         getType(recordInfo.getUserRecordType()).incrementAndGet();
+      }
+
+      public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+      {
+         getType(recordInfo.getUserRecordType()).incrementAndGet();
+      }
+
+      public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+      {
+         getType(recordInfo.getUserRecordType()).incrementAndGet();
+      }
+
+      public void onReadRollbackRecord(long transactionID) throws Exception
+      {
+      }
+
+      public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+      {
+      }
+
+      public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+      {
+      }
+
+      public void onReadDeleteRecord(long recordID) throws Exception
+      {
+      }
+
+      public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+      {
+      }
+
+      public void markAsDataFile(JournalFile file0)
+      {
+      }
+   }
+
+   /**
+    * @param server                the server where's being checked
+    * @param address               the name of the address being checked
+    * @param local                 if true we are looking for local bindings, false we are looking for remoting servers
+    * @param expect

<TRUNCATED>

Mime
View raw message