activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [19/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:49 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
index baca41d..ad2538e 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
@@ -13,13 +13,12 @@
 package org.hornetq.core.server.impl;
 
 import javax.management.MBeanServer;
+
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
-import java.lang.reflect.Array;
-import java.nio.channels.ClosedChannelException;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
@@ -31,8 +30,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,24 +38,11 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.config.HornetQDefaultConfiguration;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQAlreadyReplicatingException;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.HornetQIllegalStateException;
-import org.hornetq.api.core.HornetQInternalErrorException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.TopologyMember;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.config.BackupStrategy;
 import org.hornetq.core.config.BridgeConfiguration;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.ConfigurationUtils;
 import org.hornetq.core.config.CoreQueueConfiguration;
@@ -94,29 +78,19 @@ import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.postoffice.Binding;
-import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueBinding;
 import org.hornetq.core.postoffice.impl.DivertBinding;
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
-import org.hornetq.core.protocol.ServerPacketDecoder;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping;
-import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
-import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.security.CheckType;
 import org.hornetq.core.security.Role;
 import org.hornetq.core.security.SecurityStore;
 import org.hornetq.core.security.impl.SecurityStoreImpl;
 import org.hornetq.core.server.ActivateCallback;
-import org.hornetq.core.server.ActivationParams;
 import org.hornetq.core.server.Bindable;
 import org.hornetq.core.server.Divert;
 import org.hornetq.core.server.HornetQComponent;
@@ -125,25 +99,20 @@ import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.JournalType;
 import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.LiveNodeLocator;
 import org.hornetq.core.server.MemoryManager;
 import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.ServerSessionFactory;
 import org.hornetq.core.server.cluster.BackupManager;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.ClusterControl;
-import org.hornetq.core.server.cluster.ClusterController;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.Transformer;
 import org.hornetq.core.server.cluster.ha.HAPolicy;
-import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.server.group.impl.LocalGroupingHandler;
 import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
-import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.core.server.management.impl.ManagementServiceImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
@@ -165,15 +134,12 @@ import org.hornetq.utils.ReusableLatch;
 import org.hornetq.utils.SecurityFormatter;
 import org.hornetq.utils.VersionLoader;
 
-import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING;
-import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAIL_OVER;
-import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.STOP;
-
 /**
  * The HornetQ server implementation
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
  * @author <a href="mailto:ataylor@redhat.com>Andy Taylor</a>
+ * @author <a href="mailto:mtaylor@redhat.com>Martyn Taylor</a>
  */
 public class HornetQServerImpl implements HornetQServer
 {
@@ -187,6 +153,8 @@ public class HornetQServerImpl implements HornetQServer
     */
    public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
 
+   private HAPolicy haPolicy;
+
    enum SERVER_STATE
    {
       /**
@@ -276,20 +244,6 @@ public class HornetQServerImpl implements HornetQServer
     */
    private final ReusableLatch activationLatch = new ReusableLatch(0);
 
-   private final ReusableLatch backupSyncLatch = new ReusableLatch(0);
-
-   private final Object replicationLock = new Object();
-
-   /**
-    * Only applicable to 'remote backup servers'. If this flag is false the backup may not become
-    * 'live'.
-    */
-   private volatile boolean backupUpToDate = true;
-
-   private ReplicationManager replicationManager;
-
-   private ReplicationEndpoint replicationEndpoint;
-
    private final Set<ActivateCallback> activateCallbacks = new ConcurrentHashSet<ActivateCallback>();
 
    private volatile GroupingHandler groupingHandler;
@@ -307,16 +261,16 @@ public class HornetQServerImpl implements HornetQServer
 
    private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
 
-   private final Object failbackCheckerGuard = new Object();
-   private boolean cancelFailBackChecker;
-
    private final HornetQServer parentServer;
 
-   private ClientSessionFactoryInternal scaleDownClientSessionFactory = null;
+   //todo think about moving this to the activation
+   private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>();
+
+   private boolean threadPoolSupplied = false;
 
-   private ServerLocatorInternal scaleDownServerLocator = null;
+   private boolean scheduledPoolSupplied = false;
 
-   private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>();
+   private ServiceRegistry serviceRegistry;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -358,11 +312,19 @@ public class HornetQServerImpl implements HornetQServer
                             final HornetQSecurityManager securityManager,
                             final HornetQServer parentServer)
    {
+      this(configuration, mbeanServer, securityManager, parentServer, null);
+   }
+
+   public HornetQServerImpl(Configuration configuration,
+                            MBeanServer mbeanServer,
+                            final HornetQSecurityManager securityManager,
+                            final HornetQServer parentServer,
+                            final ServiceRegistry serviceRegistry)
+   {
       if (configuration == null)
       {
          configuration = new ConfigurationImpl();
       }
-
       if (mbeanServer == null)
       {
          // Just use JVM mbean server
@@ -389,6 +351,7 @@ public class HornetQServerImpl implements HornetQServer
 
       this.parentServer = parentServer;
 
+      this.serviceRegistry = serviceRegistry == null ?  new ServiceRegistry() : serviceRegistry;
    }
 
    // life-cycle methods
@@ -397,7 +360,7 @@ public class HornetQServerImpl implements HornetQServer
    /*
     * Can be overridden for tests
     */
-   protected NodeManager createNodeManager(final String directory, String nodeGroupName, boolean replicatingBackup)
+   protected NodeManager createNodeManager(final String directory, boolean replicatingBackup)
    {
       NodeManager manager;
       if (!configuration.isPersistenceEnabled())
@@ -412,7 +375,6 @@ public class HornetQServerImpl implements HornetQServer
       {
          manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
       }
-      manager.setNodeGroupName(nodeGroupName);
       return manager;
    }
 
@@ -423,11 +385,13 @@ public class HornetQServerImpl implements HornetQServer
          HornetQServerLogger.LOGGER.debug("Server already started!");
          return;
       }
-      synchronized (failbackCheckerGuard)
+
+      state = SERVER_STATE.STARTING;
+
+      if (haPolicy == null)
       {
-         cancelFailBackChecker = false;
+         haPolicy = ConfigurationUtils.getHAPolicy(configuration.getHAPolicyConfiguration());
       }
-      state = SERVER_STATE.STARTING;
 
       activationLatch.setCount(1);
 
@@ -439,12 +403,11 @@ public class HornetQServerImpl implements HornetQServer
       {
          checkJournalDirectory();
 
-         nodeManager =
-            createNodeManager(configuration.getJournalDirectory(), configuration.getHAPolicy().getBackupGroupName(), false);
+         nodeManager = createNodeManager(configuration.getJournalDirectory(), false);
 
          nodeManager.start();
 
-         HornetQServerLogger.LOGGER.serverStarting((configuration.getHAPolicy().isBackup() ? "backup" : "live"), configuration);
+         HornetQServerLogger.LOGGER.serverStarting((haPolicy.isBackup() ? "backup" : "live"), configuration);
 
          if (configuration.isRunSyncSpeedTest())
          {
@@ -453,38 +416,24 @@ public class HornetQServerImpl implements HornetQServer
             test.run();
          }
 
-         final boolean wasLive = !configuration.getHAPolicy().isBackup();
-         if (!configuration.getHAPolicy().isBackup())
+         final boolean wasLive = !haPolicy.isBackup();
+         if (!haPolicy.isBackup())
          {
-            if (configuration.getHAPolicy().isSharedStore() && configuration.isPersistenceEnabled())
-            {
-               activation = new SharedStoreLiveActivation();
-            }
-            else
-            {
-               activation = new SharedNothingLiveActivation();
-            }
+            activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
 
             activation.run();
          }
          // The activation on fail-back may change the value of isBackup, for that reason we are
          // checking again here
-         if (configuration.getHAPolicy().isBackup())
+         if (haPolicy.isBackup())
          {
-            if (configuration.getHAPolicy().isSharedStore())
+            if (haPolicy.isSharedStore())
             {
-               activation = new SharedStoreBackupActivation();
+               activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
             }
             else
             {
-               assert replicationEndpoint == null;
-               nodeManager.stop();
-               nodeManager =
-                  createNodeManager(configuration.getJournalDirectory(), configuration.getHAPolicy().getBackupGroupName(), true);
-               backupUpToDate = false;
-               backupSyncLatch.setCount(1);
-               replicationEndpoint = new ReplicationEndpoint(this, shutdownOnCriticalIO, wasLive);
-               activation = new SharedNothingBackupActivation(wasLive);
+               activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
             }
 
             backupActivationThread = new Thread(activation, HornetQMessageBundle.BUNDLE.activationForServer(this));
@@ -497,7 +446,7 @@ public class HornetQServerImpl implements HornetQServer
                                                      identity != null ? identity : "");
          }
          // start connector service
-         connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice);
+         connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry);
          connectorsService.start();
       }
       finally
@@ -520,6 +469,74 @@ public class HornetQServerImpl implements HornetQServer
       super.finalize();
    }
 
+   public void setState(SERVER_STATE state)
+   {
+      this.state = state;
+   }
+
+   public SERVER_STATE getState()
+   {
+      return state;
+   }
+
+   public void interrupBackupThread(NodeManager nodeManagerInUse) throws InterruptedException
+   {
+      long timeout = 30000;
+
+      long start = System.currentTimeMillis();
+
+      while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
+      {
+         if (nodeManagerInUse != null)
+         {
+            nodeManagerInUse.interrupt();
+         }
+
+         backupActivationThread.interrupt();
+
+         backupActivationThread.join(1000);
+
+      }
+
+      if (System.currentTimeMillis() - start >= timeout)
+      {
+         threadDump("Timed out waiting for backup activation to exit");
+      }
+   }
+
+   public void resetNodeManager() throws Exception
+   {
+      nodeManager.stop();
+      nodeManager =
+            createNodeManager(configuration.getJournalDirectory(), true);
+   }
+
+   public Activation getActivation()
+   {
+      return activation;
+   }
+
+   @Override
+   public HAPolicy getHAPolicy()
+   {
+      return haPolicy;
+   }
+
+   @Override
+   public void setHAPolicy(HAPolicy haPolicy)
+   {
+      this.haPolicy = haPolicy;
+   }
+
+   public ExecutorService getThreadPool()
+   {
+      return threadPool;
+   }
+
+   public void setActivation(SharedNothingLiveActivation activation)
+   {
+      this.activation = activation;
+   }
    /**
     * Stops the server in a different thread.
     */
@@ -533,7 +550,7 @@ public class HornetQServerImpl implements HornetQServer
          {
             try
             {
-               stop(configuration.getHAPolicy().isFailoverOnServerShutdown(), criticalIOError, false);
+               stop(false, criticalIOError, false);
             }
             catch (Exception e)
             {
@@ -545,7 +562,7 @@ public class HornetQServerImpl implements HornetQServer
 
    public final void stop() throws Exception
    {
-      stop(configuration.getHAPolicy().isFailoverOnServerShutdown());
+      stop(false);
    }
 
    public void addActivationParam(String key, Object val)
@@ -593,22 +610,10 @@ public class HornetQServerImpl implements HornetQServer
 
    /**
     * Stops the server
-    *
-    * @param failoverOnServerShutdown whether we will allow a backup server to become live when the
-    *                                 server is stopped normally
     * @param criticalIOError          whether we have encountered an IO error with the journal etc
-    * @param failingBack              if true don't set the flag to stop the failback checker
     */
-   private void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean failingBack) throws Exception
+   void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) throws Exception
    {
-      if (!failingBack)
-      {
-         synchronized (failbackCheckerGuard)
-         {
-            cancelFailBackChecker = true;
-         }
-      }
-
       synchronized (this)
       {
          if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING)
@@ -617,22 +622,8 @@ public class HornetQServerImpl implements HornetQServer
          }
          state = SERVER_STATE.STOPPING;
 
-         final ReplicationManager localReplicationManager = getReplicationManager();
+         activation.sendLiveIsStopping();
 
-         if (localReplicationManager != null)
-         {
-            replicationManager.sendLiveIsStopping(LiveStopping.STOP_CALLED);
-            // Schedule for 10 seconds
-            // this pool gets a 'hard' shutdown, no need to manage the Future of this Runnable.
-            scheduledPool.schedule(new Runnable()
-            {
-               @Override
-               public void run()
-               {
-                  localReplicationManager.clearReplicationTokens();
-               }
-            }, 30, TimeUnit.SECONDS);
-         }
          stopComponent(connectorsService);
 
          // we stop the groupingHandler before we stop the cluster manager so binding mappings
@@ -644,27 +635,10 @@ public class HornetQServerImpl implements HornetQServer
          }
          stopComponent(clusterManager);
 
-         // connect to the scale-down target first so that when we freeze/disconnect the clients we can tell them where
-         // we're sending the messages
-         if (configuration.getHAPolicy().isScaleDown())
-         {
-            connectToScaleDownTarget();
-         }
          freezeConnections();
       }
 
-      if (configuration.getHAPolicy().isScaleDown() && scaleDownClientSessionFactory != null)
-      {
-         try
-         {
-            scaleDown();
-         }
-         finally
-         {
-            scaleDownClientSessionFactory.close();
-            scaleDownServerLocator.close();
-         }
-      }
+      activation.postConnectionFreeze();
 
       closeAllServerSessions(criticalIOError);
 
@@ -693,12 +667,8 @@ public class HornetQServerImpl implements HornetQServer
          stopComponent(deploymentManager);
       }
 
-      if (managementService != null)
-         managementService.unregisterServer();
-
       stopComponent(backupManager);
-      stopComponent(managementService);
-      stopComponent(replicationEndpoint); // applies to a "backup" server
+      activation.preStorageClose();
       stopComponent(pagingManager);
 
       if (storageManager != null)
@@ -709,12 +679,17 @@ public class HornetQServerImpl implements HornetQServer
       if (remotingService != null)
          remotingService.stop(criticalIOError);
 
+      // Stop the management service after the remoting service to ensure all acceptors are deregistered with JMX
+      if (managementService != null)
+         managementService.unregisterServer();
+      stopComponent(managementService);
+
       stopComponent(securityManager);
       stopComponent(resourceManager);
 
       stopComponent(postOffice);
 
-      if (scheduledPool != null)
+      if (scheduledPool != null && !scheduledPoolSupplied)
       {
          // we just interrupt all running tasks, these are supposed to be pings and the like.
          scheduledPool.shutdownNow();
@@ -722,7 +697,7 @@ public class HornetQServerImpl implements HornetQServer
 
       stopComponent(memoryManager);
 
-      if (threadPool != null)
+      if (threadPool != null && !threadPoolSupplied)
       {
          threadPool.shutdown();
          try
@@ -742,26 +717,21 @@ public class HornetQServerImpl implements HornetQServer
          }
       }
 
-      scheduledPool = null;
-      threadPool = null;
+      if (!threadPoolSupplied) threadPool = null;
+      if (!scheduledPoolSupplied) scheduledPool = null;
 
       if (securityStore != null)
          securityStore.stop();
 
-      threadPool = null;
-
-      scheduledPool = null;
-
       pagingManager = null;
       securityStore = null;
       resourceManager = null;
-      replicationManager = null;
-      replicationEndpoint = null;
       postOffice = null;
       queueFactory = null;
       resourceManager = null;
       messagingServerControl = null;
       memoryManager = null;
+      backupManager = null;
 
       sessions.clear();
 
@@ -773,7 +743,7 @@ public class HornetQServerImpl implements HornetQServer
       SimpleString tempNodeID = getNodeID();
       if (activation != null)
       {
-         activation.close(failoverOnServerShutdown);
+         activation.close(failoverOnServerShutdown, restarting);
       }
       if (backupActivationThread != null)
       {
@@ -807,72 +777,7 @@ public class HornetQServerImpl implements HornetQServer
       }
    }
 
-   public long scaleDown() throws Exception
-   {
-      ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterManager.getClusterController());
-      ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = ((PostOfficeImpl) postOffice).getDuplicateIDCaches();
-      Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
-      for (SimpleString address : duplicateIDCaches.keySet())
-      {
-         DuplicateIDCache duplicateIDCache = postOffice.getDuplicateIDCache(address);
-         duplicateIDMap.put(address, duplicateIDCache.getMap());
-      }
-      return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, resourceManager, duplicateIDMap, configuration.getManagementAddress(), null);
-   }
 
-   public void connectToScaleDownTarget()
-   {
-      try
-      {
-         scaleDownServerLocator = clusterManager.getHAManager().getScaleDownConnector();
-         //use a Node Locator to connect to the cluster
-         scaleDownServerLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE);
-         LiveNodeLocator nodeLocator = clusterManager.getHAManager().getHAPolicy().getScaleDownGroupName() == null ?
-            new AnyLiveNodeLocatorForScaleDown(HornetQServerImpl.this) :
-            new NamedLiveNodeLocatorForScaleDown(clusterManager.getHAManager().getHAPolicy().getScaleDownGroupName(), HornetQServerImpl.this);
-         scaleDownServerLocator.addClusterTopologyListener(nodeLocator);
-
-         nodeLocator.connectToCluster(scaleDownServerLocator);
-         // a timeout is necessary here in case we use a NamedLiveNodeLocatorForScaleDown and there's no matching node in the cluster
-         // should the timeout be configurable?
-         nodeLocator.locateNode(HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
-         ClientSessionFactoryInternal clientSessionFactory = null;
-         while (clientSessionFactory == null)
-         {
-            Pair<TransportConfiguration, TransportConfiguration> possibleLive = null;
-            try
-            {
-               possibleLive = nodeLocator.getLiveConfiguration();
-               if (possibleLive == null)  // we've tried every connector
-                  break;
-               clientSessionFactory = (ClientSessionFactoryInternal) scaleDownServerLocator.createSessionFactory(possibleLive.getA(), 0, false);
-            }
-            catch (Exception e)
-            {
-               HornetQServerLogger.LOGGER.trace("Failed to connect to " + possibleLive.getA());
-               nodeLocator.notifyRegistrationFailed(false);
-               if (clientSessionFactory != null)
-               {
-                  clientSessionFactory.close();
-               }
-               clientSessionFactory = null;
-               // should I try the backup (i.e. getB()) from possibleLive?
-            }
-         }
-         if (clientSessionFactory != null)
-         {
-            scaleDownClientSessionFactory = clientSessionFactory;
-         }
-         else
-         {
-            throw new HornetQException("Unable to connect to server for scale-down");
-         }
-      }
-      catch (Exception e)
-      {
-         HornetQServerLogger.LOGGER.failedToScaleDown(e);
-      }
-   }
 
    public boolean checkLiveIsNotColocated(String nodeId)
    {
@@ -894,17 +799,7 @@ public class HornetQServerImpl implements HornetQServer
     */
    private void freezeConnections()
    {
-      ReplicationManager localReplicationManager = getReplicationManager();
-      TransportConfiguration tc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnectorConfiguration();
-      String nodeID = tc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(tc).getNodeId();
-      if (remotingService != null && localReplicationManager != null)
-      {
-         remotingService.freeze(nodeID, localReplicationManager.getBackupTransportConnection());
-      }
-      else if (remotingService != null)
-      {
-         remotingService.freeze(nodeID, null);
-      }
+      activation.freezeConnections(remotingService);
 
       // after disconnecting all the clients close all the server sessions so any messages in delivery will be cancelled back to the queue
       for (ServerSession serverSession : sessions.values())
@@ -962,7 +857,7 @@ public class HornetQServerImpl implements HornetQServer
 
    }
 
-   private static void stopComponent(HornetQComponent component) throws Exception
+   static void stopComponent(HornetQComponent component) throws Exception
    {
       if (component != null)
          component.stop();
@@ -1134,7 +1029,8 @@ public class HornetQServerImpl implements HornetQServer
                                       final boolean preAcknowledge,
                                       final boolean xa,
                                       final String defaultAddress,
-                                      final SessionCallback callback) throws Exception
+                                      final SessionCallback callback,
+                                      final ServerSessionFactory sessionFactory) throws Exception
    {
 
       if (securityStore != null)
@@ -1142,16 +1038,42 @@ public class HornetQServerImpl implements HornetQServer
          securityStore.authenticate(username, password);
       }
       final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
-      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context);
+      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory);
 
       sessions.put(name, session);
 
       return session;
    }
 
-   protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context) throws Exception
+   protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception
    {
-      return new ServerSessionImpl(name,
+      if (sessionFactory == null)
+      {
+         return new ServerSessionImpl(name,
+                                   username,
+                                   password,
+                                   minLargeMessageSize,
+                                   autoCommitSends,
+                                   autoCommitAcks,
+                                   preAcknowledge,
+                                   configuration.isPersistDeliveryCountBeforeDelivery(),
+                                   xa,
+                                   connection,
+                                   storageManager,
+                                   postOffice,
+                                   resourceManager,
+                                   securityStore,
+                                   managementService,
+                                   this,
+                                   configuration.getManagementAddress(),
+                                   defaultAddress == null ? null
+                                      : new SimpleString(defaultAddress),
+                                   callback,
+                                   context);
+      }
+      else
+      {
+         return sessionFactory.createCoreSession(name,
                                    username,
                                    password,
                                    minLargeMessageSize,
@@ -1172,9 +1094,11 @@ public class HornetQServerImpl implements HornetQServer
                                       : new SimpleString(defaultAddress),
                                    callback,
                                    context);
+      }
    }
 
-   protected SecurityStore getSecurityStore()
+   @Override
+   public SecurityStore getSecurityStore()
    {
       return securityStore;
    }
@@ -1233,18 +1157,6 @@ public class HornetQServerImpl implements HornetQServer
       return activationLatch.await(timeout, unit);
    }
 
-   @Override
-   public boolean waitForBackupSync(long timeout, TimeUnit unit) throws InterruptedException
-   {
-      if (configuration.getHAPolicy().getPolicyType() == HAPolicy.POLICY_TYPE.BACKUP_REPLICATED)
-      {
-         return backupSyncLatch.await(timeout, unit);
-      }
-      else
-      {
-         return true;
-      }
-   }
 
    public HornetQServerControlImpl getHornetQServerControl()
    {
@@ -1440,17 +1352,9 @@ public class HornetQServerImpl implements HornetQServer
       return groupingHandler;
    }
 
-   public ReplicationEndpoint getReplicationEndpoint()
-   {
-      return replicationEndpoint;
-   }
-
    public ReplicationManager getReplicationManager()
    {
-      synchronized (replicationLock)
-      {
-         return replicationManager;
-      }
+      return activation.getReplicationManager();
    }
 
    public ConnectorsService getConnectorsService()
@@ -1505,7 +1409,7 @@ public class HornetQServerImpl implements HornetQServer
                                      postOffice,
                                      storageManager);
 
-      Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
+      Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);
 
       postOffice.addBinding(binding);
 
@@ -1575,7 +1479,7 @@ public class HornetQServerImpl implements HornetQServer
    {
 
       return new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingDirectory(),
-                                                             configuration.getJournalBufferSize_NIO(),
+                                                             configuration.getJournalBufferTimeout_NIO(),
                                                              scheduledPool,
                                                              executorFactory,
                                                              configuration.isJournalSyncNonTransactional(),
@@ -1628,6 +1532,60 @@ public class HornetQServerImpl implements HornetQServer
       }
    }
 
+   private void callActivationCompleteCallbacks()
+   {
+      for (ActivateCallback callback : activateCallbacks)
+      {
+         callback.activationComplete();
+      }
+   }
+
+   /**
+    * Sets up HornetQ Executor Services.
+    */
+   private void initializeExecutorServices()
+   {
+      /* We check to see if a Thread Pool is supplied in the InjectedObjectRegistry.  If so we created a new Ordered
+       * Executor based on the provided Thread pool.  Otherwise we create a new ThreadPool.
+       */
+      if (serviceRegistry.getExecutorService() == null)
+      {
+         ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-" + this.toString(), false, getThisClassLoader());
+         if (configuration.getThreadPoolMaxSize() == -1)
+         {
+            threadPool = Executors.newCachedThreadPool(tFactory);
+         }
+         else
+         {
+            threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory);
+         }
+      }
+      else
+      {
+         threadPool = serviceRegistry.getExecutorService();
+         this.threadPoolSupplied = true;
+      }
+      this.executorFactory = new OrderedExecutorFactory(threadPool);
+
+       /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry.  If so we use this
+       * Scheduled ExecutorService otherwise we create a new one.
+       */
+      if (serviceRegistry.getScheduledExecutorService() == null)
+      {
+         ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-scheduled-threads", false, getThisClassLoader());
+         scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory);
+      }
+      else
+      {
+         this.scheduledPoolSupplied = true;
+         this.scheduledPool = serviceRegistry.getScheduledExecutorService();
+      }
+   }
+
+   public ServiceRegistry getServiceRegistry()
+   {
+      return serviceRegistry;
+   }
 
    /**
     * Starts everything apart from RemotingService and loading the data.
@@ -1636,16 +1594,13 @@ public class HornetQServerImpl implements HornetQServer
     * {@link #initialisePart2(boolean)}.
     * @param scalingDown
     */
-   private synchronized boolean initialisePart1(boolean scalingDown) throws Exception
+   synchronized boolean initialisePart1(boolean scalingDown) throws Exception
    {
       if (state == SERVER_STATE.STOPPED)
          return false;
-      // Create the pools - we have two pools - one for non scheduled - and another for scheduled
-
-      ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-" + this.toString(),
-                                                        false,
-                                                        getThisClassLoader());
 
+      // Create the pools - we have two pools - one for non scheduled - and another for scheduled
+      initializeExecutorServices();
 
       if (configuration.getJournalType() == JournalType.ASYNCIO && !AIOSequentialFileFactory.isSupported())
       {
@@ -1653,21 +1608,6 @@ public class HornetQServerImpl implements HornetQServer
          configuration.setJournalType(JournalType.NIO);
       }
 
-      if (configuration.getThreadPoolMaxSize() == -1)
-      {
-         threadPool = Executors.newCachedThreadPool(tFactory);
-      }
-      else
-      {
-         threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory);
-      }
-
-      executorFactory = new OrderedExecutorFactory(threadPool);
-      scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(),
-                                                      new HornetQThreadFactory("HornetQ-scheduled-threads",
-                                                                               false,
-                                                                               getThisClassLoader()));
-
       managementService = new ManagementServiceImpl(mbeanServer, configuration);
 
       if (configuration.getMemoryMeasureInterval() != -1)
@@ -1726,13 +1666,20 @@ public class HornetQServerImpl implements HornetQServer
       // This can't be created until node id is set
       clusterManager =
          new ClusterManager(executorFactory, this, postOffice, scheduledPool, managementService, configuration,
-                            nodeManager, configuration.getHAPolicy().isBackup());
+                            nodeManager, haPolicy.isBackup());
 
       backupManager = new BackupManager(this, executorFactory, scheduledPool, nodeManager, configuration, clusterManager);
 
       clusterManager.deploy();
 
-      remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool, protocolManagerFactories);
+      remotingService = new RemotingServiceImpl(clusterManager,
+                                                configuration,
+                                                this,
+                                                managementService,
+                                                scheduledPool,
+                                                protocolManagerFactories,
+                                                executorFactory.getExecutor(),
+                                                serviceRegistry);
 
       messagingServerControl = managementService.registerServer(postOffice,
                                                                 storageManager,
@@ -1745,7 +1692,7 @@ public class HornetQServerImpl implements HornetQServer
                                                                 queueFactory,
                                                                 scheduledPool,
                                                                 pagingManager,
-                                                                configuration.getHAPolicy().isBackup());
+                                                                haPolicy.isBackup());
 
       // Address settings need to deployed initially, since they're require on paging manager.start()
 
@@ -1802,7 +1749,7 @@ public class HornetQServerImpl implements HornetQServer
    /*
     * Load the data, and start remoting service so clients can connect
     */
-   private synchronized void initialisePart2(boolean scalingDown) throws Exception
+   synchronized void initialisePart2(boolean scalingDown) throws Exception
    {
       // Load the journal and populate queues, transactions and caches in memory
 
@@ -1813,7 +1760,7 @@ public class HornetQServerImpl implements HornetQServer
 
       pagingManager.reloadStores();
 
-      JournalLoadInformation[] journalInfo = loadJournals(scalingDown);
+      JournalLoadInformation[] journalInfo = loadJournals();
 
 
       final ServerInfo dumper = new ServerInfo(this, pagingManager);
@@ -1891,6 +1838,8 @@ public class HornetQServerImpl implements HornetQServer
       {
          activationLatch.countDown();
       }
+
+      callActivationCompleteCallbacks();
    }
 
    private void deploySecurityFromConfiguration()
@@ -1921,35 +1870,18 @@ public class HornetQServerImpl implements HornetQServer
       }
    }
 
-   private JournalLoadInformation[] loadJournals(boolean scalingDown) throws Exception
+   private JournalLoadInformation[] loadJournals() throws Exception
    {
-      JournalLoader journalLoader;
+      JournalLoader journalLoader = activation.createJournalLoader(postOffice,
+            pagingManager,
+            storageManager,
+            queueFactory,
+            nodeManager,
+            managementService,
+            groupingHandler,
+            configuration,
+            parentServer);
 
-      if (scalingDown)
-      {
-         journalLoader = new BackupRecoveryJournalLoader(postOffice,
-                                                         pagingManager,
-                                                         storageManager,
-                                                         queueFactory,
-                                                         nodeManager,
-                                                         managementService,
-                                                         groupingHandler,
-                                                         configuration,
-                                                         parentServer,
-                                                         clusterManager.getHAManager().getScaleDownConnector(),
-                                                         clusterManager.getClusterController());
-      }
-      else
-      {
-         journalLoader = new PostOfficeJournalLoader(postOffice,
-               pagingManager,
-               storageManager,
-               queueFactory,
-               nodeManager,
-               managementService,
-               groupingHandler,
-               configuration);
-      }
       JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
 
       List<QueueBindingInfo> queueBindingInfos = new ArrayList();
@@ -2063,8 +1995,8 @@ public class HornetQServerImpl implements HornetQServer
 
       Filter filter = FilterImpl.createFilter(filterString);
 
-      long txID = storageManager.generateUniqueID();
-      long queueID = storageManager.generateUniqueID();
+      long txID = storageManager.generateID();
+      long queueID = storageManager.generateID();
 
       PageSubscription pageSubscription;
 
@@ -2212,7 +2144,7 @@ public class HornetQServerImpl implements HornetQServer
    /**
     * Check if journal directory exists or create it (if configured to do so)
     */
-   private void checkJournalDirectory()
+   void checkJournalDirectory()
    {
       File journalDir = new File(configuration.getJournalDirectory());
 
@@ -2229,927 +2161,46 @@ public class HornetQServerImpl implements HornetQServer
       }
    }
 
-   /**
-    * To be called by backup trying to fail back the server
-    */
-   private void startFailbackChecker()
-   {
-      scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000L, 1000L, TimeUnit.MILLISECONDS);
-   }
 
    // Inner classes
    // --------------------------------------------------------------------------------
 
-   private class FailbackChecker implements Runnable
+
+
+   public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
    {
-      private boolean restarting = false;
+      boolean failedAlready = false;
 
-      public void run()
+      public synchronized void onIOException(Exception cause, String message, SequentialFile file)
       {
-         try
-         {
-            if (!restarting && nodeManager.isAwaitingFailback())
-            {
-               HornetQServerLogger.LOGGER.awaitFailBack();
-               restarting = true;
-               Thread t = new Thread(new Runnable()
-               {
-                  public void run()
-                  {
-                     try
-                     {
-                        HornetQServerLogger.LOGGER.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
-                        stop(true, false, true);
-                        // We need to wait some time before we start the backup again
-                        // otherwise we may eventually start before the live had a chance to get it
-                        Thread.sleep(configuration.getHAPolicy().getFailbackDelay());
-                        synchronized (failbackCheckerGuard)
-                        {
-                           if (cancelFailBackChecker)
-                              return;
-                           if (configuration.getHAPolicy().isSharedStore())
-                           {
-                              configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE);
-                           }
-                           else
-                           {
-                              configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED);
-                           }
-                           HornetQServerLogger.LOGGER.debug(HornetQServerImpl.this +
-                                                               "::Starting backup node now after failback");
-                           start();
-                        }
-                     }
-                     catch (Exception e)
-                     {
-                        HornetQServerLogger.LOGGER.serverRestartWarning();
-                     }
-                  }
-               });
-               t.start();
-            }
-         }
-         catch (Exception e)
+         if (!failedAlready)
          {
-            HornetQServerLogger.LOGGER.serverRestartWarning(e);
+            failedAlready = true;
+
+            HornetQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
+
+            stopTheServer(true);
          }
       }
    }
 
-   private final class SharedStoreLiveActivation implements Activation
+
+   /**
+    * This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
+    * utility class, as it would be a door to load anything you like in a safe VM.
+    * For that reason any class trying to do a privileged block should do with the AccessController directly.
+    */
+   private static Object safeInitNewInstance(final String className)
    {
-      public void run()
+      return AccessController.doPrivileged(new PrivilegedAction<Object>()
       {
-         try
-         {
-            HornetQServerLogger.LOGGER.awaitingLiveLock();
-
-            checkJournalDirectory();
-
-            if (HornetQServerLogger.LOGGER.isDebugEnabled())
-            {
-               HornetQServerLogger.LOGGER.debug("First part initialization on " + this);
-            }
-
-            if (!initialisePart1(false))
-               return;
-
-            if (nodeManager.isBackupLive())
-            {
-               /*
-                * looks like we've failed over at some point need to inform that we are the backup
-                * so when the current live goes down they failover to us
-                */
-               if (HornetQServerLogger.LOGGER.isDebugEnabled())
-               {
-                  HornetQServerLogger.LOGGER.debug("announcing backup to the former live" + this);
-               }
-               backupManager.start();
-               backupManager.announceBackup();
-               Thread.sleep(configuration.getHAPolicy().getFailbackDelay());
-            }
-
-            nodeManager.startLiveNode();
-
-            if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING)
-            {
-               return;
-            }
-
-            initialisePart2(false);
-
-            HornetQServerLogger.LOGGER.serverIsLive();
-         }
-         catch (Exception e)
-         {
-            HornetQServerLogger.LOGGER.initializationError(e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-         // TO avoid a NPE from stop
-         NodeManager nodeManagerInUse = nodeManager;
-
-         if (nodeManagerInUse != null)
-         {
-            if (permanently)
-            {
-               nodeManagerInUse.crashLiveServer();
-            }
-            else
-            {
-               nodeManagerInUse.pauseLiveServer();
-            }
-         }
-      }
-   }
-
-   private final class SharedStoreBackupActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            nodeManager.startBackup();
-
-            boolean scalingDown = configuration.getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN;
-
-            if (!initialisePart1(scalingDown))
-               return;
-
-            backupManager.start();
-
-            state = SERVER_STATE.STARTED;
-
-            HornetQServerLogger.LOGGER.backupServerStarted(version.getFullVersion(), nodeManager.getNodeId());
-
-            nodeManager.awaitLiveNode();
-
-            configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE);
-
-            backupManager.activated();
-            if (state != SERVER_STATE.STARTED)
-            {
-               return;
-            }
-
-            initialisePart2(scalingDown);
-
-            if (scalingDown)
-            {
-               HornetQServerLogger.LOGGER.backupServerScaledDown();
-               Thread t = new Thread(new Runnable()
-               {
-                  @Override
-                  public void run()
-                  {
-                     try
-                     {
-                        stop();
-                        //we are shared store but if we were started by a parent server then we shouldn't restart
-                        if (configuration.getHAPolicy().isRestartBackup())
-                        {
-                           start();
-                        }
-                     }
-                     catch (Exception e)
-                     {
-                        HornetQServerLogger.LOGGER.serverRestartWarning();
-                     }
-                  }
-               });
-               t.start();
-               return;
-            }
-            else
-            {
-               HornetQServerLogger.LOGGER.backupServerIsLive();
-
-               nodeManager.releaseBackup();
-            }
-            if (configuration.getHAPolicy().isAllowAutoFailBack())
-            {
-               startFailbackChecker();
-            }
-         }
-         catch (InterruptedException e)
-         {
-            // this is ok, we are being stopped
-         }
-         catch (ClosedChannelException e)
-         {
-            // this is ok too, we are being stopped
-         }
-         catch (Exception e)
-         {
-            if (!(e.getCause() instanceof InterruptedException))
-            {
-               HornetQServerLogger.LOGGER.initializationError(e);
-            }
-         }
-         catch (Throwable e)
-         {
-            HornetQServerLogger.LOGGER.initializationError(e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-
-         // To avoid a NPE cause by the stop
-         NodeManager nodeManagerInUse = nodeManager;
-
-         if (configuration.getHAPolicy().isBackup())
-         {
-            long timeout = 30000;
-
-            long start = System.currentTimeMillis();
-
-            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
-            {
-               if (nodeManagerInUse != null)
-               {
-                  nodeManagerInUse.interrupt();
-               }
-
-               backupActivationThread.interrupt();
-
-               backupActivationThread.join(1000);
-
-            }
-
-            if (System.currentTimeMillis() - start >= timeout)
-            {
-               threadDump("Timed out waiting for backup activation to exit");
-            }
-
-            if (nodeManagerInUse != null)
-            {
-               nodeManagerInUse.stopBackup();
-            }
-         }
-         else
-         {
-
-            if (nodeManagerInUse != null)
-            {
-               // if we are now live, behave as live
-               // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
-               // started before the live
-               if (permanently)
-               {
-                  nodeManagerInUse.crashLiveServer();
-               }
-               else
-               {
-                  nodeManagerInUse.pauseLiveServer();
-               }
-            }
-         }
-      }
-   }
-
-   private final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
-   {
-      boolean failedAlready = false;
-
-      public synchronized void onIOException(Exception cause, String message, SequentialFile file)
-      {
-         if (!failedAlready)
-         {
-            failedAlready = true;
-
-            HornetQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
-
-            stopTheServer(true);
-         }
-      }
-   }
-
-   private interface Activation extends Runnable
-   {
-      void close(boolean permanently) throws Exception;
-   }
-
-   private final class SharedNothingBackupActivation implements Activation
-   {
-      SharedNothingBackupQuorum backupQuorum;
-      private final boolean attemptFailBack;
-      private String nodeID;
-      ClusterControl clusterControl;
-      private boolean closed;
-
-      public SharedNothingBackupActivation(boolean attemptFailBack)
-      {
-         this.attemptFailBack = attemptFailBack;
-      }
-
-      public void run()
-      {
-         try
-         {
-            synchronized (HornetQServerImpl.this)
-            {
-               state = SERVER_STATE.STARTED;
-            }
-            // move all data away:
-            nodeManager.stop();
-            moveServerData();
-            nodeManager.start();
-            synchronized (this)
-            {
-               if (closed)
-                  return;
-            }
-
-            boolean scalingDown = configuration.getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN;
-
-            if (!initialisePart1(scalingDown))
-               return;
-
-            synchronized (this)
-            {
-               if (closed)
-                  return;
-               backupQuorum = new SharedNothingBackupQuorum(storageManager, nodeManager, scheduledPool);
-               clusterManager.getQuorumManager().registerQuorum(backupQuorum);
-            }
-
-            //use a Node Locator to connect to the cluster
-            LiveNodeLocator nodeLocator;
-            if (activationParams.get(ActivationParams.REPLICATION_ENDPOINT) != null)
-            {
-               TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT);
-               nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup()));
-            }
-            else
-            {
-               nodeLocator = configuration.getHAPolicy().getBackupGroupName() == null ?
-                     new AnyLiveNodeLocatorForReplication(backupQuorum, HornetQServerImpl.this) :
-                     new NamedLiveNodeLocatorForReplication(configuration.getHAPolicy().getBackupGroupName(), backupQuorum);
-            }
-            ClusterController clusterController = clusterManager.getClusterController();
-            clusterController.addClusterTopologyListenerForReplication(nodeLocator);
-            //todo do we actually need to wait?
-            clusterController.awaitConnectionToReplicationCluster();
-
-            clusterController.addIncomingInterceptorForReplication(new ReplicationError(HornetQServerImpl.this, nodeLocator));
-
-            // nodeManager.startBackup();
-
-            backupManager.start();
-
-            replicationEndpoint.setBackupQuorum(backupQuorum);
-            replicationEndpoint.setExecutor(executorFactory.getExecutor());
-            EndpointConnector endpointConnector = new EndpointConnector();
-
-            HornetQServerLogger.LOGGER.backupServerStarted(version.getFullVersion(), nodeManager.getNodeId());
-            state = SERVER_STATE.STARTED;
-
-            BACKUP_ACTIVATION signal;
-            do
-            {
-               //locate the first live server to try to replicate
-               nodeLocator.locateNode();
-               if (closed)
-               {
-                  return;
-               }
-               Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();
-               nodeID = nodeLocator.getNodeID();
-               //in a normal (non failback) scenario if we couldn't find our live server we should fail
-               if (!attemptFailBack)
-               {
-                  //this shouldn't happen
-                  if (nodeID == null)
-                     throw new RuntimeException("Could not establish the connection");
-                  nodeManager.setNodeID(nodeID);
-               }
-
-               try
-               {
-                  clusterControl =  clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
-               }
-               catch (Exception e)
-               {
-                  if (possibleLive.getB() != null)
-                  {
-                     try
-                     {
-                        clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB());
-                     }
-                     catch (Exception e1)
-                     {
-                        clusterControl = null;
-                     }
-                  }
-               }
-               if (clusterControl == null)
-               {
-                  //its ok to retry here since we haven't started replication yet
-                  //it may just be the server has gone since discovery
-                  Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
-                  signal = BACKUP_ACTIVATION.ALREADY_REPLICATING;
-                  continue;
-               }
-
-               threadPool.execute(endpointConnector);
-               /**
-                * Wait for a signal from the the quorum manager, at this point if replication has been successful we can
-                * fail over or if there is an error trying to replicate (such as already replicating) we try the
-                * process again on the next live server.  All the action happens inside {@link BackupQuorum}
-                */
-               signal = backupQuorum.waitForStatusChange();
-               /**
-                * replicationEndpoint will be holding lots of open files. Make sure they get
-                * closed/sync'ed.
-                */
-               stopComponent(replicationEndpoint);
-               // time to give up
-               if (!isStarted() || signal == STOP)
-                  return;
-                  // time to fail over
-               else if (signal == FAIL_OVER)
-                  break;
-                  // something has gone badly run restart from scratch
-               else if (signal == BACKUP_ACTIVATION.FAILURE_REPLICATING)
-               {
-                  Thread startThread = new Thread(new Runnable()
-                  {
-                     @Override
-                     public void run()
-                     {
-                        try
-                        {
-                           stop();
-                        }
-                        catch (Exception e)
-                        {
-                           HornetQServerLogger.LOGGER.errorRestartingBackupServer(e, HornetQServerImpl.this);
-                        }
-                     }
-                  });
-                  startThread.start();
-                  return;
-               }
-               //ok, this live is no good, let's reset and try again
-               //close this session factory, we're done with it
-               clusterControl.close();
-               backupQuorum.reset();
-               if (replicationEndpoint.getChannel() != null)
-               {
-                  replicationEndpoint.getChannel().close();
-                  replicationEndpoint.setChannel(null);
-               }
-            }
-            while (signal == BACKUP_ACTIVATION.ALREADY_REPLICATING);
-
-            clusterManager.getQuorumManager().unRegisterQuorum(backupQuorum);
-
-            if (!isRemoteBackupUpToDate())
-            {
-               throw HornetQMessageBundle.BUNDLE.backupServerNotInSync();
-            }
-
-            configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED);
-            synchronized (HornetQServerImpl.this)
-            {
-               if (!isStarted())
-                  return;
-               HornetQServerLogger.LOGGER.becomingLive(HornetQServerImpl.this);
-               nodeManager.stopBackup();
-               storageManager.start();
-               backupManager.activated();
-               initialisePart2(scalingDown);
-            }
-         }
-         catch (Exception e)
-         {
-            if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !isStarted())
-               // do not log these errors if the server is being stopped.
-               return;
-            HornetQServerLogger.LOGGER.initializationError(e);
-            e.printStackTrace();
-         }
-      }
-
-      public void close(final boolean permanently) throws Exception
-      {
-         synchronized (this)
-         {
-            if (backupQuorum != null)
-               backupQuorum.causeExit(STOP);
-            closed = true;
-         }
-
-         if (configuration.getHAPolicy().isBackup())
-         {
-            long timeout = 30000;
-
-            long start = System.currentTimeMillis();
-
-            // To avoid a NPE cause by the stop
-            NodeManager nodeManagerInUse = nodeManager;
-
-            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
-            {
-
-               if (nodeManagerInUse != null)
-               {
-                  nodeManagerInUse.interrupt();
-               }
-
-               backupActivationThread.interrupt();
-
-               Thread.sleep(1000);
-            }
-
-            if (System.currentTimeMillis() - start >= timeout)
-            {
-               HornetQServerLogger.LOGGER.backupActivationProblem();
-            }
-
-            if (nodeManagerInUse != null)
-            {
-               nodeManagerInUse.stopBackup();
-            }
-         }
-      }
-
-      /**
-       * Live has notified this server that it is going to stop.
-       */
-      public void failOver(final LiveStopping finalMessage)
-      {
-         if (finalMessage == null)
-         {
-            backupQuorum.causeExit(FAILURE_REPLICATING);
-         }
-         else
-         {
-            backupQuorum.failOver(finalMessage);
-         }
-      }
-
-      private class EndpointConnector implements Runnable
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               //we should only try once, if its not there we should move on.
-               clusterControl.getSessionFactory().setReconnectAttempts(1);
-               backupQuorum.setSessionFactory(clusterControl.getSessionFactory());
-               //get the connection and request replication to live
-               clusterControl.authorize();
-               connectToReplicationEndpoint(clusterControl);
-               replicationEndpoint.start();
-               clusterControl.announceReplicatingBackupToLive(attemptFailBack);
-            }
-            catch (Exception e)
-            {
-               //we shouldn't stop the server just mark the connector as tried and unavailable
-               HornetQServerLogger.LOGGER.replicationStartProblem(e);
-               backupQuorum.causeExit(FAILURE_REPLICATING);
-            }
-         }
-
-         private synchronized ReplicationEndpoint connectToReplicationEndpoint(final ClusterControl control) throws Exception
-         {
-            if (!isStarted())
-               return null;
-            if (!configuration.getHAPolicy().isBackup())
-            {
-               throw HornetQMessageBundle.BUNDLE.serverNotBackupServer();
-            }
-
-            Channel replicationChannel = control.createReplicationChannel();
-
-            replicationChannel.setHandler(replicationEndpoint);
-
-            if (replicationEndpoint.getChannel() != null)
-            {
-               throw HornetQMessageBundle.BUNDLE.alreadyHaveReplicationServer();
-            }
-
-            replicationEndpoint.setChannel(replicationChannel);
-
-            return replicationEndpoint;
-         }
-      }
-   }
-
-
-   private final class SharedNothingLiveActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            if (configuration.isClustered() && configuration.isCheckForLiveServer() && isNodeIdUsed())
-            {
-               configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED);
-               return;
-            }
-
-            initialisePart1(false);
-
-            initialisePart2(false);
-
-            if (identity != null)
-            {
-               HornetQServerLogger.LOGGER.serverIsLive(identity);
-            }
-            else
-            {
-               HornetQServerLogger.LOGGER.serverIsLive();
-            }
-         }
-         catch (Exception e)
-         {
-            HornetQServerLogger.LOGGER.initializationError(e);
-         }
-      }
-
-      /**
-       * Determines whether there is another server already running with this server's nodeID.
-       * <p/>
-       * This can happen in case of a successful fail-over followed by the live's restart
-       * (attempting a fail-back).
-       *
-       * @throws Exception
-       */
-      private boolean isNodeIdUsed() throws Exception
-      {
-         if (configuration.getClusterConfigurations().isEmpty())
-            return false;
-         SimpleString nodeId0;
-         try
-         {
-            nodeId0 = nodeManager.readNodeId();
-         }
-         catch (HornetQIllegalStateException e)
-         {
-            nodeId0 = null;
-         }
-
-         ServerLocatorInternal locator;
-
-         ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(configuration);
-
-         locator = getLocator(config);
-
-         ClientSessionFactoryInternal factory = null;
-
-         NodeIdListener listener = new NodeIdListener(nodeId0);
-
-         locator.addClusterTopologyListener(listener);
-         try
-         {
-            locator.setReconnectAttempts(0);
-            try
-            {
-               locator.addClusterTopologyListener(listener);
-               factory = locator.connectNoWarnings();
-            }
-            catch (Exception notConnected)
-            {
-               return false;
-            }
-
-            listener.latch.await(5, TimeUnit.SECONDS);
-
-            return listener.isNodePresent;
-         }
-         finally
-         {
-            if (factory != null)
-               factory.close();
-            if (locator != null)
-               locator.close();
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-         // To avoid a NPE cause by the stop
-         NodeManager nodeManagerInUse = nodeManager;
-
-         if (nodeManagerInUse != null)
-         {
-            if (permanently)
-            {
-               nodeManagerInUse.crashLiveServer();
-            }
-            else
-            {
-               nodeManagerInUse.pauseLiveServer();
-            }
-         }
-      }
-   }
-
-   static final class NodeIdListener implements ClusterTopologyListener
-   {
-      volatile boolean isNodePresent = false;
-
-      private final SimpleString nodeId;
-      private final CountDownLatch latch = new CountDownLatch(1);
-
-      public NodeIdListener(SimpleString nodeId)
-      {
-         this.nodeId = nodeId;
-      }
-
-      @Override
-      public void nodeUP(TopologyMember topologyMember, boolean last)
-      {
-         boolean isOurNodeId = nodeId != null && nodeId.toString().equals(topologyMember.getNodeId());
-         if (isOurNodeId)
-         {
-            isNodePresent = true;
-         }
-         if (isOurNodeId || last)
-         {
-            latch.countDown();
-         }
-      }
-
-      @Override
-      public void nodeDown(long eventUID, String nodeID)
-      {
-         // no-op
-      }
-   }
-
-   private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
-   {
-      TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class,
-                                                                                        connectorNames.size());
-      int count = 0;
-      for (String connectorName : connectorNames)
-      {
-         TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
-
-         if (connector == null)
-         {
-            HornetQServerLogger.LOGGER.bridgeNoConnector(connectorName);
-
-            return null;
-         }
-
-         tcConfigs[count++] = connector;
-      }
-
-      return tcConfigs;
-   }
-
-   /**
-    * This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
-    * utility class, as it would be a door to load anything you like in a safe VM.
-    * For that reason any class trying to do a privileged block should do with the AccessController directly.
-    */
-   private static Object safeInitNewInstance(final String className)
-   {
-      return AccessController.doPrivileged(new PrivilegedAction<Object>()
-      {
-         public Object run()
+         public Object run()
          {
             return ClassloadingUtil.newInstanceFromClassLoader(className);
          }
       });
    }
 
-   @Override
-   public void startReplication(CoreRemotingConnection rc, final ClusterConnection clusterConnection,
-                                final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean isFailBackRequest) throws HornetQException
-   {
-      if (replicationManager != null)
-      {
-         throw new HornetQAlreadyReplicatingException();
-      }
-
-      if (!isStarted())
-      {
-         throw new HornetQIllegalStateException();
-      }
-
-      synchronized (replicationLock)
-      {
-
-         if (replicationManager != null)
-         {
-            throw new HornetQAlreadyReplicatingException();
-         }
-         ReplicationFailureListener listener = new ReplicationFailureListener();
-         rc.addCloseListener(listener);
-         rc.addFailureListener(listener);
-         replicationManager = new ReplicationManager(rc, executorFactory);
-         replicationManager.start();
-         Thread t = new Thread(new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(),
-                                                  isFailBackRequest && configuration.getHAPolicy().isAllowAutoFailBack());
-                  clusterConnection.nodeAnnounced(System.currentTimeMillis(), getNodeID().toString(), configuration.getHAPolicy().getBackupGroupName(), clusterManager.getHAManager().getHAPolicy().getScaleDownGroupName(), pair, true);
-
-                  backupUpToDate = false;
-
-                  if (isFailBackRequest && configuration.getHAPolicy().isAllowAutoFailBack())
-                  {
-                     BackupTopologyListener listener1 = new BackupTopologyListener(getNodeID().toString());
-                     clusterConnection.addClusterTopologyListener(listener1);
-                     if (listener1.waitForBackup())
-                     {
-                        try
-                        {
-                           Thread.sleep(configuration.getHAPolicy().getFailbackDelay());
-                        }
-                        catch (InterruptedException e)
-                        {
-                           //
-                        }
-                        //if we have to many backups kept or arent configured to restart just stop, otherwise restart as a backup
-                        if (!configuration.getHAPolicy().isRestartBackup() && countNumberOfCopiedJournals() >= configuration.getMaxSavedReplicatedJournalsSize() && configuration.getMaxSavedReplicatedJournalsSize() >= 0)
-                        {
-                           stop(true);
-                           HornetQServerLogger.LOGGER.stopReplicatedBackupAfterFailback();
-                        }
-                        else
-                        {
-                           stop(true);
-                           HornetQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
-                           configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED);
-                           start();
-                        }
-                     }
-                     else
-                     {
-                        HornetQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
-                     }
-                  }
-               }
-               catch (Exception e)
-               {
-                  if (state == HornetQServerImpl.SERVER_STATE.STARTED)
-                  {
-                  /*
-                   * The reasoning here is that the exception was either caused by (1) the
-                   * (interaction with) the backup, or (2) by an IO Error at the storage. If (1), we
-                   * can swallow the exception and ignore the replication request. If (2) the live
-                   * will crash shortly.
-                   */
-                     HornetQServerLogger.LOGGER.errorStartingReplication(e);
-                  }
-                  try
-                  {
-                     stopComponent(replicationManager);
-                  }
-                  catch (Exception hqe)
-                  {
-                     HornetQServerLogger.LOGGER.errorStoppingReplication(hqe);
-                  }
-                  finally
-                  {
-                     synchronized (replicationLock)
-                     {
-                        replicationManager = null;
-                     }
-                  }
-               }
-            }
-         });
-
-         t.start();
-      }
-   }
-
-   /**
-    * Whether a remote backup server was in sync with its live server. If it was not in sync, it may
-    * not take over the live's functions.
-    * <p/>
-    * A local backup server or a live server should always return {@code true}
-    *
-    * @return whether the backup is up-to-date, if the server is not a backup it always returns
-    * {@code true}.
-    */
-   public boolean isRemoteBackupUpToDate()
-   {
-      return backupUpToDate;
-   }
-
-   public void setRemoteBackupUpToDate()
-   {
-      backupManager.announceBackup();
-      backupUpToDate = true;
-      backupSyncLatch.countDown();
-   }
-
    public void addProtocolManagerFactory(ProtocolManagerFactory factory)
    {
       protocolManagerFactories.add(factory);
@@ -3186,7 +2237,7 @@ public class HornetQServerImpl implements HornetQServer
    }
 
 
-   private int countNumberOfCopiedJournals()
+   int countNumberOfCopiedJournals()
    {
       //will use the main journal to check for how many backups have been kept
       File journalDir = new File(configuration.getJournalDirectory());
@@ -3209,98 +2260,13 @@ public class HornetQServerImpl implements HornetQServer
       return numberOfbackupsSaved;
    }
 
-   private final class ReplicationFailureListener implements FailureListener, CloseListener
-   {
-
-      @Override
-      public void connectionFailed(HornetQException exception, boolean failedOver)
-      {
-         connectionClosed();
-      }
-
-      @Override
-      public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID)
-      {
-         connectionFailed(me, failedOver);
-      }
-
-      @Override
-      public void connectionClosed()
-      {
-         threadPool.execute(new Runnable()
-         {
-            public void run()
-            {
-               synchronized (replicationLock)
-               {
-                  if (replicationManager != null)
-                  {
-                     storageManager.stopReplication();
-                     replicationManager = null;
-                  }
-               }
-            }
-         });
-      }
-   }
-
-   /**
-    * @throws HornetQException
-    */
-   public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws HornetQException
-   {
-      HornetQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
-                                          backupUpToDate);
-      if (!configuration.getHAPolicy().isBackup() || configuration.getHAPolicy().isSharedStore())
-      {
-         throw new HornetQInternalErrorException();
-      }
-      if (activation instanceof SharedNothingBackupActivation)
-      {
-         final SharedNothingBackupActivation replicationActivation = ((SharedNothingBackupActivation) activation);
-
-         if (!backupUpToDate)
-         {
-            replicationActivation.failOver(null);
-         }
-         else
-         {
-            replicationActivation.failOver(finalMessage);
-         }
-      }
-   }
-
-
-   private ServerLocatorInternal getLocator(ClusterConnectionConfiguration config) throws HornetQException
-   {
-      ServerLocatorInternal locator;
-      if (config.getDiscoveryGroupName() != null)
-      {
-         DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
-
-         if (dg == null)
-         {
-            throw HornetQMessageBundle.BUNDLE.noDiscoveryGroupFound(dg);
-         }
-         locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
-      }
-      else
-      {
-         TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
-            : null;
-
-         locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
-      }
-      return locator;
-   }
-
    /**
     * Move data away before starting data synchronization for fail-back.
     * <p/>
     * Use case is a server, upon restarting, finding a former backup running in its place. It will
     * move any older data away and log a warning about it.
     */
-   private void moveServerData()
+   void moveServerData()
    {
       String[] dataDirs =
          new String[]{configuration.getBindingsDirectory(),

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java
new file mode 100644
index 0000000..c863c66
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+public abstract class LiveActivation extends Activation
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java
new file mode 100644
index 0000000..ceb60d1
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.LiveNodeLocator;
+import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory;
+import org.hornetq.core.server.cluster.ha.LiveOnlyPolicy;
+import org.hornetq.core.server.cluster.ha.ScaleDownPolicy;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+public class LiveOnlyActivation extends Activation
+{
+   //this is how we act when we initially start as live
+   private LiveOnlyPolicy liveOnlyPolicy;
+
+   private final HornetQServerImpl hornetQServer;
+
+   private ServerLocatorInternal scaleDownServerLocator;
+
+   private ClientSessionFactoryInternal scaleDownClientSessionFactory;
+
+   public LiveOnlyActivation(HornetQServerImpl server, LiveOnlyPolicy liveOnlyPolicy)
+   {
+      this.hornetQServer = server;
+      this.liveOnlyPolicy = liveOnlyPolicy;
+   }
+
+   public void run()
+   {
+      try
+      {
+         hornetQServer.initialisePart1(false);
+
+         hornetQServer.initialisePart2(false);
+
+         if (hornetQServer.getIdentity() != null)
+         {
+            HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity());
+         }
+         else
+         {
+            HornetQServerLogger.LOGGER.serverIsLive();
+         }
+      }
+      catch (Exception e)
+      {
+         HornetQServerLogger.LOGGER.initializationError(e);
+      }
+   }
+
+   @Override
+   public void close(boolean permanently, boolean restarting) throws Exception
+   {
+      if (scaleDownServerLocator != null)
+      {
+         scaleDownServerLocator.close();
+         scaleDownServerLocator = null;
+      }
+   }
+
+   public void freezeConnections(RemotingService remotingService)
+   {
+      // connect to the scale-down target first so that when we freeze/disconnect the clients we can tell them where
+      // we're sending the messages
+      if (liveOnlyPolicy.getScaleDownPolicy() != null && liveOnlyPolicy.getScaleDownPolicy().isEnabled())
+      {
+         connectToScaleDownTarget(liveOnlyPolicy.getScaleDownPolicy());
+      }
+
+      TransportConfiguration tc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnectorConfiguration();
+      String nodeID = tc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(tc).getNodeId();
+      if (remotingService != null)
+      {
+         remotingService.freeze(nodeID, null);
+      }
+   }
+
+   @Override
+   public void postConnectionFreeze()
+   {
+      if (liveOnlyPolicy.getScaleDownPolicy() != null
+            && liveOnlyPolicy.getScaleDownPolicy().isEnabled()
+            && scaleDownClientSessionFactory != null)
+      {
+         try
+         {
+            scaleDown();
+         }
+         catch (Exception e)
+         {
+            HornetQServerLogger.LOGGER.failedToScaleDown(e);
+         }
+         finally
+         {
+            scaleDownClientSessionFactory.close();
+            scaleDownServerLocator.close();
+         }
+      }
+   }
+
+   public void connectToScaleDownTarget(ScaleDownPolicy scaleDownPolicy)
+   {
+      try
+      {
+         scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, hornetQServer);
+         //use a Node Locator to connect to the cluster
+         scaleDownServerLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
+         LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ?
+               new AnyLiveNodeLocatorForScaleDown(hornetQServer) :
+               new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), hornetQServer);
+         scaleDownServerLocator.addClusterTopologyListener(nodeLocator);
+
+         nodeLocator.connectToCluster(scaleDownServerLocator);
+         // a timeout is necessary here in case we use a NamedLiveNodeLocatorForScaleDown and there's no matching node in the cluster
+         // should the timeout be configurable?
+         nodeLocator.locateNode(HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
+         ClientSessionFactoryInternal clientSessionFactory = null;
+         while (clientSessionFactory == null)
+         {
+            Pair<TransportConfiguration, TransportConfiguration> possibleLive = null;
+            try
+            {
+               possibleLive = nodeLocator.getLiveConfiguration();
+               if (possibleLive == null)  // we've tried every connector
+                  break;
+               clientSessionFactory = (ClientSessionFactoryInternal) scaleDownServerLocator.createSessionFactory(possibleLive.getA(), 0, false);
+            }
+            catch (Exception e)
+            {
+               HornetQServerLogger.LOGGER.trace("Failed to connect to " + possibleLive.getA());
+               nodeLocator.notifyRegistrationFailed(false);
+               if (clientSessionFactory != null)
+               {
+                  clientSessionFactory.close();
+               }
+               clientSessionFactory = null;
+               // should I try the backup (i.e. getB()) from possibleLive?
+            }
+         }
+         if (clientSessionFactory != null)
+         {
+            scaleDownClientSessionFactory = clientSessionFactory;
+         }
+         else
+         {
+            throw new HornetQException("Unable to connect to server for scale-down");
+         }
+      }
+      catch (Exception e)
+      {
+         HornetQServerLogger.LOGGER.failedToScaleDown(e);
+      }
+   }
+
+
+   public long scaleDown() throws Exception
+   {
+      ScaleDownHandler scaleDownHandler = new ScaleDownHandler(hornetQServer.getPagingManager(),
+            hornetQServer.getPostOffice(),
+            hornetQServer.getNodeManager(),
+            hornetQServer.getClusterManager().getClusterController());
+      ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = ((PostOfficeImpl) hornetQServer.getPostOffice()).getDuplicateIDCaches();
+      Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
+      for (SimpleString address : duplicateIDCaches.keySet())
+      {
+         DuplicateIDCache duplicateIDCache = hornetQServer.getPostOffice().getDuplicateIDCache(address);
+         duplicateIDMap.put(address, duplicateIDCache.getMap());
+      }
+      return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, hornetQServer.getResourceManager(), duplicateIDMap,
+            hornetQServer.getManagementService().getManagementAddress(), null);
+   }
+}


Mime
View raw message