activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [21/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:51 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java
index ec1892c..ff61df9 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java
@@ -12,574 +12,20 @@
  */
 package org.hornetq.core.server.cluster.ha;
 
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.TopologyMember;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.client.impl.TopologyMemberImpl;
-import org.hornetq.core.config.BackupStrategy;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.server.ActivationParams;
 import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.HornetQMessageBundle;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServerLogger;
-import org.hornetq.core.server.cluster.ClusterControl;
-import org.hornetq.core.server.cluster.ClusterController;
-import org.hornetq.core.server.cluster.qourum.QuorumVote;
-import org.hornetq.core.server.cluster.qourum.QuorumVoteHandler;
-import org.hornetq.core.server.cluster.qourum.Vote;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
 
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 /*
-* An HAManager takes care of any colocated backups in a VM. These are either pre configured backups or backups requested
-* by other lives. It also takes care of the quorum voting to request backups.
+* An HAManager takes care of any colocated backups in a VM.
 * */
-public class HAManager implements HornetQComponent
+public interface HAManager extends HornetQComponent
 {
-   private static final SimpleString REQUEST_BACKUP_QUORUM_VOTE = new SimpleString("RequestBackupQuorumVote");
-
-   private final HAPolicy haPolicy;
-
-   private final HornetQSecurityManager securityManager;
-
-   private final  HornetQServer server;
-
-   private Set<Configuration> backupServerConfigurations;
-
-   private Map<String, HornetQServer> backupServers = new HashMap<>();
-
-   private boolean started;
-
-   public HAManager(HAPolicy haPolicy, HornetQSecurityManager securityManager, HornetQServer hornetQServer, Set<Configuration> backupServerConfigurations)
-   {
-      this.haPolicy = haPolicy;
-      this.securityManager = securityManager;
-      server = hornetQServer;
-      this.backupServerConfigurations = backupServerConfigurations;
-   }
-
-   /**
-    * starts the HA manager, any pre configured backups are started and if a backup is needed a quorum vote in initiated
-    */
-   public void start()
-   {
-      if (started)
-         return;
-      server.getClusterManager().getQuorumManager().registerQuorumHandler(new RequestBackupQuorumVoteHandler());
-      if (backupServerConfigurations != null)
-      {
-         for (Configuration configuration : backupServerConfigurations)
-         {
-            HornetQServer backup = server.createBackupServer(configuration);
-            backupServers.put(configuration.getName(), backup);
-         }
-      }
-      //start the backups
-      for (HornetQServer hornetQServer : backupServers.values())
-      {
-         try
-         {
-            hornetQServer.start();
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
-      }
-
-      //vote for a backup if required
-      if (haPolicy.isRequestBackup())
-      {
-         server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote());
-      }
-      started = true;
-   }
-
-   /**
-    * stop any backups
-    */
-   public void stop()
-   {
-      for (HornetQServer hornetQServer : backupServers.values())
-      {
-         try
-         {
-            hornetQServer.stop();
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-            //todo
-         }
-      }
-      backupServers.clear();
-      started = false;
-   }
-
-   @Override
-   public boolean isStarted()
-   {
-      return started;
-   }
-
-   public synchronized boolean activateSharedStoreBackup(int backupSize, String journalDirectory, String bindingsDirectory, String largeMessagesDirectory, String pagingDirectory) throws Exception
-   {
-      if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size())
-      {
-         return false;
-      }
-      Configuration configuration = server.getConfiguration().copy();
-      HornetQServer backup = server.createBackupServer(configuration);
-      try
-      {
-         int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
-         String name = "colocated_backup_" + backupServers.size() + 1;
-         updateSharedStoreConfiguration(configuration, haPolicy.getBackupStrategy(), name, portOffset, haPolicy.getRemoteConnectors(), journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory);
-         //make sure we don't restart as we are colocated
-         configuration.getHAPolicy().setRestartBackup(false);
-         backupServers.put(configuration.getName(), backup);
-         backup.start();
-      }
-      catch (Exception e)
-      {
-         backup.stop();
-         //todo log a warning
-         return false;
-      }
-      return true;
-   }
-
-   /**
-    * activate a backup server replicating from a specified node.
-    *
-    * @param backupSize the number of backups the requesting server thinks there are. if this is changed then we should
-    * decline and the requesting server can cast a re vote
-    * @param nodeID the id of the node to replicate from
-    * @return true if the server was created and started
-    * @throws Exception
-    */
-   public synchronized boolean activateReplicatedBackup(int backupSize, SimpleString nodeID) throws Exception
-   {
-      if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size())
-      {
-         return false;
-      }
-      Configuration configuration = server.getConfiguration().copy();
-      HornetQServer backup = server.createBackupServer(configuration);
-      try
-      {
-         TopologyMember member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString());
-         int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
-         String name = "colocated_backup_" + backupServers.size() + 1;
-         updateReplicatedConfiguration(configuration, haPolicy.getBackupStrategy(), name, portOffset, haPolicy.getRemoteConnectors());
-         backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, member);
-         backupServers.put(configuration.getName(), backup);
-         backup.start();
-      }
-      catch (Exception e)
-      {
-         backup.stop();
-         HornetQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
-         return false;
-      }
-      return true;
-   }
-
    /**
     * return the current backup servers
     *
     * @return the backups
     */
-   public Map<String, HornetQServer> getBackupServers()
-   {
-      return backupServers;
-   }
-
-   /**
-    * send a request to a live server to start a backup for us
-    *
-    * @param connectorPair the connector for the node to request a backup from
-    * @param backupSize the current size of the requested nodes backups
-    * @return true if the request wa successful.
-    * @throws Exception
-    */
-   private boolean requestBackup(Pair<TransportConfiguration, TransportConfiguration> connectorPair, int backupSize) throws Exception
-   {
-      ClusterController clusterController = server.getClusterManager().getClusterController();
-      try
-      (
-            ClusterControl clusterControl = clusterController.connectToNode(connectorPair.getA());
-      )
-      {
-         clusterControl.authorize();
-         if (haPolicy.getPolicyType() == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE)
-         {
-
-            return clusterControl.requestSharedStoreBackup(backupSize,
-                                                    server.getConfiguration().getJournalDirectory(),
-                                                    server.getConfiguration().getBindingsDirectory(),
-                                                    server.getConfiguration().getLargeMessagesDirectory(),
-                                                    server.getConfiguration().getPagingDirectory());
-         }
-         else
-         {
-            return clusterControl.requestReplicatedBackup(backupSize, server.getNodeID());
-
-         }
-      }
-   }
-
-   /**
-    * update the backups configuration
-    *  @param backupConfiguration the configuration to update
-    * @param backupStrategy the strategy for the backup
-    * @param name the new name of the backup
-    * @param portOffset the offset for the acceptors and any connectors that need changing
-    * @param remoteConnectors the connectors that don't need off setting, typically remote
-    * @param journalDirectory
-    * @param bindingsDirectory
-    * @param largeMessagesDirectory
-    * @param pagingDirectory
-    */
-   private static void updateSharedStoreConfiguration(Configuration backupConfiguration,
-                                                      BackupStrategy backupStrategy,
-                                                      String name,
-                                                      int portOffset,
-                                                      List<String> remoteConnectors,
-                                                      String journalDirectory,
-                                                      String bindingsDirectory,
-                                                      String largeMessagesDirectory,
-                                                      String pagingDirectory)
-   {
-      backupConfiguration.getHAPolicy().setBackupStrategy(backupStrategy);
-      backupConfiguration.setName(name);
-      backupConfiguration.setJournalDirectory(journalDirectory);
-      backupConfiguration.setBindingsDirectory(bindingsDirectory);
-      backupConfiguration.setLargeMessagesDirectory(largeMessagesDirectory);
-      backupConfiguration.setPagingDirectory(pagingDirectory);
-      backupConfiguration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE);
-      updateAcceptorsAndConnectors(backupConfiguration, portOffset, remoteConnectors);
-   }
-
-   /**
-    * update the backups configuration
-    *
-    * @param backupConfiguration the configuration to update
-    * @param backupStrategy the strategy for the backup
-    * @param name the new name of the backup
-    * @param portOffset the offset for the acceptors and any connectors that need changing
-    * @param remoteConnectors the connectors that don't need off setting, typically remote
-    */
-   private static void updateReplicatedConfiguration(Configuration backupConfiguration,
-                                                     BackupStrategy backupStrategy,
-                                                     String name,
-                                                     int portOffset,
-                                                     List<String> remoteConnectors)
-   {
-      backupConfiguration.getHAPolicy().setBackupStrategy(backupStrategy);
-      backupConfiguration.setName(name);
-      backupConfiguration.setJournalDirectory(backupConfiguration.getJournalDirectory() + name);
-      backupConfiguration.setPagingDirectory(backupConfiguration.getPagingDirectory() + name);
-      backupConfiguration.setLargeMessagesDirectory(backupConfiguration.getLargeMessagesDirectory() + name);
-      backupConfiguration.setBindingsDirectory(backupConfiguration.getBindingsDirectory() + name);
-      backupConfiguration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED);
-      updateAcceptorsAndConnectors(backupConfiguration, portOffset, remoteConnectors);
-   }
-
-   private static void updateAcceptorsAndConnectors(Configuration backupConfiguration, int portOffset, List<String> remoteConnectors)
-   {
-      //we only do this if we are a full server, if scale down then our acceptors wont be needed and our connectors will
-      // be the same as the parent server
-      if (backupConfiguration.getHAPolicy().getBackupStrategy() == BackupStrategy.FULL)
-      {
-         Set<TransportConfiguration> acceptors = backupConfiguration.getAcceptorConfigurations();
-         for (TransportConfiguration acceptor : acceptors)
-         {
-            updatebackupParams(backupConfiguration.getName(), portOffset, acceptor.getParams());
-         }
-         Map<String, TransportConfiguration> connectorConfigurations = backupConfiguration.getConnectorConfigurations();
-         for (Map.Entry<String, TransportConfiguration> entry : connectorConfigurations.entrySet())
-         {
-            //check to make sure we aren't a remote connector as this shouldn't be changed
-            if (!remoteConnectors.contains(entry.getValue().getName()))
-            {
-               updatebackupParams(backupConfiguration.getName(), portOffset, entry.getValue().getParams());
-            }
-         }
-      }
-      else if (backupConfiguration.getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN)
-      {
-         //if we are scaling down then we wont need any acceptors but clear anyway for belts and braces
-         backupConfiguration.getAcceptorConfigurations().clear();
-      }
-   }
-
-   private static void updatebackupParams(String name, int portOffset, Map<String, Object> params)
-   {
-      if (params != null)
-      {
-         Object port = params.get("port");
-         if (port != null)
-         {
-            Integer integer = Integer.valueOf(port.toString());
-            integer += portOffset;
-            params.put("port", integer.toString());
-         }
-         Object serverId = params.get("server-id");
-         if (serverId != null)
-         {
-            params.put("server-id", serverId.toString() + "(" + name + ")");
-         }
-      }
-   }
-
-   public HAPolicy getHAPolicy()
-   {
-      return haPolicy;
-   }
-
-   public ServerLocatorInternal getScaleDownConnector() throws HornetQException
-   {
-      if (!haPolicy.getScaleDownConnectors().isEmpty())
-      {
-         return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(connectorNameListToArray(haPolicy.getScaleDownConnectors()));
-      }
-      else if (haPolicy.getScaleDownDiscoveryGroup() != null)
-      {
-         DiscoveryGroupConfiguration dg = server.getConfiguration().getDiscoveryGroupConfigurations().get(haPolicy.getScaleDownDiscoveryGroup());
-
-         if (dg == null)
-         {
-            throw HornetQMessageBundle.BUNDLE.noDiscoveryGroupFound(dg);
-         }
-         return  (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
-      }
-      else
-      {
-         Map<String, TransportConfiguration> connectorConfigurations = server.getConfiguration().getConnectorConfigurations();
-         for (TransportConfiguration transportConfiguration : connectorConfigurations.values())
-         {
-            if (transportConfiguration.getFactoryClassName().equals(InVMConnectorFactory.class.getName()))
-            {
-               return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(transportConfiguration);
-            }
-         }
-      }
-      throw HornetQMessageBundle.BUNDLE.noConfigurationFoundForScaleDown();
-   }
-
-
-
-   private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
-   {
-      TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class,
-            connectorNames.size());
-      int count = 0;
-      for (String connectorName : connectorNames)
-      {
-         TransportConfiguration connector = server.getConfiguration().getConnectorConfigurations().get(connectorName);
-
-         if (connector == null)
-         {
-            HornetQServerLogger.LOGGER.bridgeNoConnector(connectorName);
-
-            return null;
-         }
-
-         tcConfigs[count++] = connector;
-      }
-
-      return tcConfigs;
-   }
-   /**
-    * A vote handler for incoming backup request votes
-    */
-   private final class RequestBackupQuorumVoteHandler implements QuorumVoteHandler
-   {
-      @Override
-      public Vote vote(Vote vote)
-      {
-         return new RequestBackupVote(backupServers.size(), server.getNodeID().toString(), backupServers.size() < haPolicy.getMaxBackups());
-      }
-
-      @Override
-      public SimpleString getQuorumName()
-      {
-         return REQUEST_BACKUP_QUORUM_VOTE;
-      }
-
-      @Override
-      public Vote decode(HornetQBuffer voteBuffer)
-      {
-         RequestBackupVote requestBackupVote = new RequestBackupVote();
-         requestBackupVote.decode(voteBuffer);
-         return requestBackupVote;
-      }
-   }
-
-   /**
-    * a quorum vote for backup requests
-    */
-   private final class RequestBackupQuorumVote extends QuorumVote<RequestBackupVote, Pair<String, Integer>>
-   {
-      //the available nodes that we can request
-      private final List<Pair<String, Integer>> nodes = new ArrayList<>();
-
-      public RequestBackupQuorumVote()
-      {
-         super(REQUEST_BACKUP_QUORUM_VOTE);
-      }
-
-      @Override
-      public Vote connected()
-      {
-         return new RequestBackupVote();
-      }
-
-      @Override
-      public Vote notConnected()
-      {
-         return new RequestBackupVote();
-      }
-
-      @Override
-      public void vote(RequestBackupVote vote)
-      {
-         //if the returned vote is available add it to the nodes we can request
-         if (vote.backupAvailable)
-         {
-            nodes.add(vote.getVote());
-         }
-      }
-
-      @Override
-      public Pair<String, Integer> getDecision()
-      {
-         //sort the nodes by how many backups they have and choose the first
-         Collections.sort(nodes, new Comparator<Pair<String, Integer>>()
-         {
-            @Override
-            public int compare(Pair<String, Integer> o1, Pair<String, Integer> o2)
-            {
-               return o1.getB().compareTo(o2.getB());
-            }
-         });
-         return nodes.get(0);
-      }
-
-      @Override
-      public void allVotesCast(Topology voteTopology)
-      {
-         //if we have any nodes that we can request then send a request
-         if (nodes.size() > 0)
-         {
-            Pair<String, Integer> decision = getDecision();
-            TopologyMemberImpl member = voteTopology.getMember(decision.getA());
-            try
-            {
-               boolean backupStarted = requestBackup(member.getConnector(), decision.getB().intValue());
-               if (!backupStarted)
-               {
-                  nodes.clear();
-                  server.getScheduledPool().schedule(new Runnable()
-                  {
-                     @Override
-                     public void run()
-                     {
-                        server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote());
-                     }
-                  }, haPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS);
-               }
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-               //todo
-            }
-         }
-         else
-         {
-            nodes.clear();
-            server.getScheduledPool().schedule(new Runnable()
-            {
-               @Override
-               public void run()
-               {
-                  server.getClusterManager().getQuorumManager().vote(RequestBackupQuorumVote.this);
-               }
-            }, haPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS);
-         }
-      }
-
-      @Override
-      public SimpleString getName()
-      {
-         return REQUEST_BACKUP_QUORUM_VOTE;
-      }
-   }
-
-   class RequestBackupVote extends Vote<Pair<String, Integer>>
-   {
-      private int backupsSize;
-      private String nodeID;
-      private boolean backupAvailable;
-
-      public RequestBackupVote()
-      {
-         backupsSize = -1;
-      }
-
-      public RequestBackupVote(int backupsSize, String nodeID, boolean backupAvailable)
-      {
-         this.backupsSize = backupsSize;
-         this.nodeID = nodeID;
-         this.backupAvailable = backupAvailable;
-      }
-
-      @Override
-      public void encode(HornetQBuffer buff)
-      {
-         buff.writeInt(backupsSize);
-         buff.writeNullableString(nodeID);
-         buff.writeBoolean(backupAvailable);
-      }
-
-      @Override
-      public void decode(HornetQBuffer buff)
-      {
-         backupsSize = buff.readInt();
-         nodeID = buff.readNullableString();
-         backupAvailable = buff.readBoolean();
-      }
-
-      @Override
-      public boolean isRequestServerVote()
-      {
-         return true;
-      }
-
-      @Override
-      public Pair<String, Integer> getVote()
-      {
-         return new Pair<>(nodeID, backupsSize);
-      }
-   }
+   Map<String, HornetQServer> getBackupServers();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java
index ed7c6de..a4003a9 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java
@@ -12,344 +12,36 @@
  */
 package org.hornetq.core.server.cluster.ha;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
-import org.hornetq.api.config.HornetQDefaultConfiguration;
-import org.hornetq.core.config.BackupStrategy;
+import org.hornetq.core.server.impl.Activation;
+import org.hornetq.core.server.impl.HornetQServerImpl;
 
 /**
  * Every live server will have an HAPolicy that configures the type of server that it should be either live, backup or
  * colocated (both). It also configures how, if colocated, it should react to sending and receiving requests for backups.
  */
-public class HAPolicy implements Serializable
+public interface HAPolicy<T extends Activation>
 {
-   /**
-    * the policy type for a server
-    */
-   public enum POLICY_TYPE
-   {
-      NONE((byte) 0),
-      REPLICATED((byte) 1),
-      SHARED_STORE((byte) 2),
-      BACKUP_REPLICATED((byte) 3),
-      BACKUP_SHARED_STORE((byte) 4),
-      COLOCATED_REPLICATED((byte) 5),
-      COLOCATED_SHARED_STORE((byte) 6);
+   /*
+   * created the Activation associated with this policy.
+   * */
+   T createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception;
 
-      private static final Set<POLICY_TYPE> all = EnumSet.allOf(POLICY_TYPE.class);
-      private final byte type;
+   boolean isSharedStore();
 
-      POLICY_TYPE(byte type)
-      {
-         this.type = type;
-      }
+   boolean isBackup();
 
-      public byte getType()
-      {
-         return type;
-      }
+   boolean canScaleDown();
 
-      public static POLICY_TYPE toBackupType(byte b)
-      {
-         for (POLICY_TYPE backupType : all)
-         {
-            if (b == backupType.getType())
-            {
-               return backupType;
-            }
-         }
-         return null;
-      }
-   }
+   /*
+   * todo These 3 methods could probably be moved as they are specific to the activation however they are needed for certain packets.
+   * */
 
-   private POLICY_TYPE policyType = POLICY_TYPE.valueOf(HornetQDefaultConfiguration.getDefaultHapolicyType());
+   String getBackupGroupName();
 
-   private boolean requestBackup = HornetQDefaultConfiguration.isDefaultHapolicyRequestBackup();
+   String getScaleDownGroupName();
 
-   private int backupRequestRetries = HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries();
+   String getScaleDownClustername();
 
-   private long backupRequestRetryInterval = HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval();
-
-   private int maxBackups = HornetQDefaultConfiguration.getDefaultHapolicyMaxBackups();
-
-   private int backupPortOffset = HornetQDefaultConfiguration.getDefaultHapolicyBackupPortOffset();
-
-   private BackupStrategy backupStrategy = BackupStrategy.valueOf(HornetQDefaultConfiguration.getDefaultHapolicyBackupStrategy());
-
-   private List<String> scaleDownConnectors = new ArrayList<>();
-
-   private String scaleDownDiscoveryGroup = null;
-
-   private String scaleDownGroupName = null;
-
-   private String backupGroupName = null;
-
-   private List<String> remoteConnectors = new ArrayList<>();
-
-   private boolean checkForLiverServer = HornetQDefaultConfiguration.isDefaultCheckForLiveServer();
-
-   private boolean allowAutoFailBack = HornetQDefaultConfiguration.isDefaultAllowAutoFailback();
-
-   private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay();
-
-   private boolean failoverOnServerShutdown = HornetQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
-
-   private String replicationClusterName;
-
-   private String scaleDownClusterName;
-
-   private int maxSavedReplicatedJournalsSize = HornetQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
-
-   private boolean scaleDown = HornetQDefaultConfiguration.isDefaultScaleDown();
-
-   private boolean restartBackup = HornetQDefaultConfiguration.isDefaultRestartBackup();
-
-   public POLICY_TYPE getPolicyType()
-   {
-      return policyType;
-   }
-
-   public void setPolicyType(POLICY_TYPE policyType)
-   {
-      this.policyType = policyType;
-   }
-
-   public BackupStrategy getBackupStrategy()
-   {
-      return backupStrategy;
-   }
-
-   public void setBackupStrategy(BackupStrategy backupStrategy)
-   {
-      this.backupStrategy = backupStrategy;
-   }
-
-   /**
-    * Should we scaleDown our messages when the server is shutdown cleanly.
-    *
-    * @return true if server should scaleDown its messages on clean shutdown
-    * @see #setScaleDown(boolean)
-    */
-   public boolean isScaleDown()
-   {
-      return scaleDown;
-   }
-
-   /**
-    * Sets whether to allow the server to scaleDown its messages on server shutdown.
-    */
-   public void setScaleDown(boolean scaleDown)
-   {
-      this.scaleDown = scaleDown;
-   }
-
-   /**
-    * returns the name used to group
-    *
-    * @return the name of the group
-    */
-   public String getScaleDownGroupName()
-   {
-      return scaleDownGroupName;
-   }
-
-   /**
-    * Used to configure groups of live/backup servers.
-    *
-    * @param nodeGroupName the node group name
-    */
-   public void setScaleDownGroupName(String nodeGroupName)
-   {
-      this.scaleDownGroupName = nodeGroupName;
-   }
-
-   public String getBackupGroupName()
-   {
-      return backupGroupName;
-   }
-
-   public void setBackupGroupName(String backupGroupName)
-   {
-      this.backupGroupName = backupGroupName;
-   }
-
-   public List<String> getScaleDownConnectors()
-   {
-      return scaleDownConnectors;
-   }
-
-   public void setScaleDownConnectors(List<String> scaleDownConnectors)
-   {
-      this.scaleDownConnectors = scaleDownConnectors;
-   }
-
-   public void setScaleDownDiscoveryGroup(String scaleDownDiscoveryGroup)
-   {
-      this.scaleDownDiscoveryGroup = scaleDownDiscoveryGroup;
-   }
-
-   public String getScaleDownDiscoveryGroup()
-   {
-      return scaleDownDiscoveryGroup;
-   }
-
-   public boolean isRequestBackup()
-   {
-      return requestBackup;
-   }
-
-   public void setRequestBackup(boolean requestBackup)
-   {
-      this.requestBackup = requestBackup;
-   }
-
-   public int getBackupRequestRetries()
-   {
-      return backupRequestRetries;
-   }
-
-   public void setBackupRequestRetries(int backupRequestRetries)
-   {
-      this.backupRequestRetries = backupRequestRetries;
-   }
-
-   public long getBackupRequestRetryInterval()
-   {
-      return backupRequestRetryInterval;
-   }
-
-   public void setBackupRequestRetryInterval(long backupRequestRetryInterval)
-   {
-      this.backupRequestRetryInterval = backupRequestRetryInterval;
-   }
-
-   public int getMaxBackups()
-   {
-      return maxBackups;
-   }
-
-   public void setMaxBackups(int maxBackups)
-   {
-      this.maxBackups = maxBackups;
-   }
-
-   public int getBackupPortOffset()
-   {
-      return backupPortOffset;
-   }
-
-   public void setBackupPortOffset(int backupPortOffset)
-   {
-      this.backupPortOffset = backupPortOffset;
-   }
-
-   public List<String> getRemoteConnectors()
-   {
-      return remoteConnectors;
-   }
-
-   public void setRemoteConnectors(List<String> remoteConnectors)
-   {
-      this.remoteConnectors = remoteConnectors;
-   }
-
-   public boolean isCheckForLiveServer()
-   {
-      return checkForLiverServer;
-   }
-
-   public void setCheckForLiveServer(boolean checkForLiverServer)
-   {
-      this.checkForLiverServer = checkForLiverServer;
-   }
-
-   public boolean isAllowAutoFailBack()
-   {
-      return allowAutoFailBack;
-   }
-
-   public void setAllowAutoFailBack(boolean allowAutoFailBack)
-   {
-      this.allowAutoFailBack = allowAutoFailBack;
-   }
-
-   public long getFailbackDelay()
-   {
-      return failbackDelay;
-   }
-
-   public void setFailbackDelay(long failbackDelay)
-   {
-      this.failbackDelay = failbackDelay;
-   }
-
-   public boolean isFailoverOnServerShutdown()
-   {
-      return failoverOnServerShutdown;
-   }
-
-   public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown)
-   {
-      this.failoverOnServerShutdown = failoverOnServerShutdown;
-   }
-
-   public void setReplicationClustername(String clusterName)
-   {
-      this.replicationClusterName = clusterName;
-   }
-
-   public String getReplicationClustername()
-   {
-      return replicationClusterName;
-   }
-
-   public void setScaleDownClustername(String clusterName)
-   {
-      this.scaleDownClusterName = clusterName;
-   }
-
-   public String getScaleDownClustername()
-   {
-      return scaleDownClusterName;
-   }
-
-   public void setMaxSavedReplicatedJournalSize(int maxSavedReplicatedJournalsSize)
-   {
-      this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
-   }
-
-   public int getMaxSavedReplicatedJournalsSize()
-   {
-      return maxSavedReplicatedJournalsSize;
-   }
-
-   public boolean isRestartBackup()
-   {
-      return restartBackup;
-   }
-
-   public void setRestartBackup(boolean restartBackup)
-   {
-      this.restartBackup = restartBackup;
-   }
-
-   public boolean isSharedStore()
-   {
-      if (policyType == POLICY_TYPE.BACKUP_SHARED_STORE || policyType == POLICY_TYPE.SHARED_STORE || policyType == POLICY_TYPE.COLOCATED_SHARED_STORE)
-         return true;
-      else
-         return false;
-   }
-
-   public boolean isBackup()
-   {
-      if (policyType == POLICY_TYPE.BACKUP_SHARED_STORE || policyType == POLICY_TYPE.BACKUP_REPLICATED)
-         return true;
-      else
-         return false;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java
deleted file mode 100644
index cd9a09d..0000000
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.cluster.ha;
-
-
-import org.hornetq.core.config.BackupStrategy;
-
-/**
- * a template for creating policy. this makes it easier in configuration and in embedded code to create a certain type
- * of policy. for instance:
- *
- * <ha-policy template="COLOCATED_REPLICATED"/>
- *
- * or in code
- *
- * HAPolicy policy = HAPolicyTemplate.COLOCATED_REPLICATED.getHaPolicy()
- */
-public enum HAPolicyTemplate
-{
-   NONE(createNonePolicy()),
-   REPLICATED(createReplicatedPolicy()),
-   SHARED_STORE(createSharedStorePolicy()),
-   BACKUP_REPLICATED(createBackupReplicatedPolicy()),
-   BACKUP_SHARED_STORE(createBackupSharedStorePolicy()),
-   COLOCATED_REPLICATED(createColocatedReplicatedPolicy()),
-   COLOCATED_SHARED_STORE(createColocatedSharedStorePolicy());
-
-   private final HAPolicy haPolicy;
-
-   public HAPolicy getHaPolicy()
-   {
-      return haPolicy;
-   }
-
-   HAPolicyTemplate(HAPolicy haPolicy)
-   {
-      this.haPolicy = haPolicy;
-   }
-
-   private static HAPolicy createNonePolicy()
-   {
-      HAPolicy policy = new HAPolicy();
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.NONE);
-      return policy;
-   }
-
-   private static HAPolicy createReplicatedPolicy()
-   {
-      HAPolicy policy = new HAPolicy();
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED);
-      return policy;
-   }
-
-   private static HAPolicy createSharedStorePolicy()
-   {
-      HAPolicy policy = new HAPolicy();
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE);
-      return policy;
-   }
-
-   private static HAPolicy createBackupReplicatedPolicy()
-   {
-      HAPolicy policy = new HAPolicy();
-      policy.setMaxBackups(0);
-      policy.setRequestBackup(false);
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED);
-      policy.setBackupStrategy(BackupStrategy.FULL);
-      policy.setRestartBackup(true);
-      return policy;
-   }
-
-   private static HAPolicy createBackupSharedStorePolicy()
-   {
-      HAPolicy policy = new HAPolicy();
-      policy.setMaxBackups(0);
-      policy.setRequestBackup(false);
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE);
-      policy.setBackupStrategy(BackupStrategy.FULL);
-      policy.setRestartBackup(true);
-      return policy;
-   }
-
-   private static HAPolicy createColocatedSharedStorePolicy()
-   {
-      HAPolicy policy = new HAPolicy();
-      policy.setBackupPortOffset(100);
-      policy.setBackupRequestRetries(-1);
-      policy.setBackupRequestRetryInterval(5000);
-      policy.setMaxBackups(2);
-      policy.setRequestBackup(true);
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE);
-      policy.setBackupStrategy(BackupStrategy.SCALE_DOWN);
-      return policy;
-   }
-
-   private static HAPolicy createColocatedReplicatedPolicy()
-   {
-      HAPolicy policy = new HAPolicy();
-      policy.setBackupPortOffset(100);
-      policy.setBackupRequestRetries(-1);
-      policy.setBackupRequestRetryInterval(5000);
-      policy.setMaxBackups(2);
-      policy.setRequestBackup(true);
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.COLOCATED_REPLICATED);
-      policy.setBackupStrategy(BackupStrategy.SCALE_DOWN);
-      return policy;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java
new file mode 100644
index 0000000..499e1b3
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.core.server.impl.Activation;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.LiveOnlyActivation;
+
+import java.util.Map;
+
+public class LiveOnlyPolicy implements HAPolicy<Activation>
+{
+   private ScaleDownPolicy scaleDownPolicy;
+
+   public LiveOnlyPolicy()
+   {
+   }
+
+   public LiveOnlyPolicy(ScaleDownPolicy scaleDownPolicy)
+   {
+      this.scaleDownPolicy = scaleDownPolicy;
+   }
+
+   @Override
+   public Activation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO)
+   {
+      return new LiveOnlyActivation(server, this);
+   }
+
+   @Override
+   public String getBackupGroupName()
+   {
+      return null;
+   }
+
+   @Override
+   public String getScaleDownGroupName()
+   {
+      return scaleDownPolicy == null ? null : scaleDownPolicy.getGroupName();
+   }
+
+   @Override
+   public String getScaleDownClustername()
+   {
+      return null;
+   }
+
+   @Override
+   public boolean isSharedStore()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean isBackup()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean canScaleDown()
+   {
+      return scaleDownPolicy != null;
+   }
+
+   public ScaleDownPolicy getScaleDownPolicy()
+   {
+      return scaleDownPolicy;
+   }
+
+   public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy)
+   {
+      this.scaleDownPolicy = scaleDownPolicy;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java
new file mode 100644
index 0000000..64d591f
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.config.HornetQDefaultConfiguration;
+import org.hornetq.core.server.impl.Activation;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.SharedNothingBackupActivation;
+
+import java.util.Map;
+
+public class ReplicaPolicy extends BackupPolicy
+{
+   private String clusterName;
+
+   private int maxSavedReplicatedJournalsSize = HornetQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
+
+   private String groupName = null;
+
+   private boolean restartBackup = HornetQDefaultConfiguration.isDefaultRestartBackup();
+
+   private ReplicatedPolicy replicatedPolicy;
+
+   public ReplicaPolicy()
+   {
+   }
+
+   public ReplicaPolicy(String clusterName, int maxSavedReplicatedJournalsSize, String groupName, boolean restartBackup, boolean allowFailback, long failbackDelay, ScaleDownPolicy scaleDownPolicy)
+   {
+      this.clusterName = clusterName;
+      this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
+      this.groupName = groupName;
+      this.restartBackup = restartBackup;
+      this.scaleDownPolicy = scaleDownPolicy;
+      //todo check default settings
+      replicatedPolicy = new ReplicatedPolicy(false, allowFailback, failbackDelay, groupName, clusterName, this);
+   }
+
+   public ReplicaPolicy(String clusterName, int maxSavedReplicatedJournalsSize, String groupName, ReplicatedPolicy replicatedPolicy)
+   {
+      this.clusterName = clusterName;
+      this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
+      this.groupName = groupName;
+      this.replicatedPolicy = replicatedPolicy;
+   }
+
+   public String getClusterName()
+   {
+      return clusterName;
+   }
+
+   public void setClusterName(String clusterName)
+   {
+      this.clusterName = clusterName;
+   }
+
+   public int getMaxSavedReplicatedJournalsSize()
+   {
+      return maxSavedReplicatedJournalsSize;
+   }
+
+   public void setMaxSavedReplicatedJournalsSize(int maxSavedReplicatedJournalsSize)
+   {
+      this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
+   }
+
+   public ReplicatedPolicy getReplicatedPolicy()
+   {
+      return replicatedPolicy;
+   }
+
+   public void setReplicatedPolicy(ReplicatedPolicy replicatedPolicy)
+   {
+      this.replicatedPolicy = replicatedPolicy;
+   }
+
+   /*
+   * these 2 methods are the same, leaving both as the second is correct but the first is needed until more refactoring is done
+   * */
+   public String getBackupGroupName()
+   {
+      return groupName;
+   }
+
+   public String getGroupName()
+   {
+      return groupName;
+   }
+
+   public void setGroupName(String groupName)
+   {
+      this.groupName = groupName;
+   }
+
+   public boolean isRestartBackup()
+   {
+      return restartBackup;
+   }
+
+   public void setRestartBackup(boolean restartBackup)
+   {
+      this.restartBackup = restartBackup;
+   }
+
+   @Override
+   public boolean isSharedStore()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean canScaleDown()
+   {
+      return scaleDownPolicy != null;
+   }
+
+   @Override
+   public Activation createActivation(HornetQServerImpl server, boolean wasLive,
+                                      Map<String, Object> activationParams,
+                                      HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception
+   {
+      SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this);
+      backupActivation.init();
+      return backupActivation;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java
new file mode 100644
index 0000000..cfb4439
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.config.HornetQDefaultConfiguration;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.LiveActivation;
+import org.hornetq.core.server.impl.SharedNothingLiveActivation;
+
+import java.util.Map;
+
+public class ReplicatedPolicy implements HAPolicy<LiveActivation>
+{
+   private boolean checkForLiveServer = HornetQDefaultConfiguration.isDefaultCheckForLiveServer();
+
+   private String groupName = null;
+
+   private String clusterName;
+
+   /*
+   * these are only set by the ReplicaPolicy after failover to decide if the live server can failback, these should not
+   * be exposed in configuration.
+   * */
+   private boolean allowAutoFailBack = HornetQDefaultConfiguration.isDefaultAllowAutoFailback();
+
+   private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay();
+
+   /*
+   * this is used as the policy when the server is started after a failover
+   * */
+   private ReplicaPolicy replicaPolicy;
+
+
+   public ReplicatedPolicy()
+   {
+   }
+
+   public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName)
+   {
+      this.checkForLiveServer = checkForLiveServer;
+      this.groupName = groupName;
+      this.clusterName = clusterName;
+      /*
+      * we create this with sensible defaults in case we start after a failover
+      * */
+      replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this);
+   }
+
+   public ReplicatedPolicy(boolean checkForLiveServer, boolean allowAutoFailBack, long failbackDelay, String groupName, String clusterName, ReplicaPolicy replicaPolicy)
+   {
+      this.checkForLiveServer = checkForLiveServer;
+      this.clusterName = clusterName;
+      this.groupName = groupName;
+      this.allowAutoFailBack = allowAutoFailBack;
+      this.failbackDelay = failbackDelay;
+      this.replicaPolicy = replicaPolicy;
+   }
+
+   public boolean isCheckForLiveServer()
+   {
+      return checkForLiveServer;
+   }
+
+   public void setCheckForLiveServer(boolean checkForLiveServer)
+   {
+      this.checkForLiveServer = checkForLiveServer;
+   }
+
+   public boolean isAllowAutoFailBack()
+   {
+      return allowAutoFailBack;
+   }
+
+   public long getFailbackDelay()
+   {
+      return failbackDelay;
+   }
+
+   public void setFailbackDelay(long failbackDelay)
+   {
+      this.failbackDelay = failbackDelay;
+   }
+
+   public String getClusterName()
+   {
+      return clusterName;
+   }
+
+   public void setClusterName(String clusterName)
+   {
+      this.clusterName = clusterName;
+   }
+
+   public ReplicaPolicy getReplicaPolicy()
+   {
+      return replicaPolicy;
+   }
+
+   public void setReplicaPolicy(ReplicaPolicy replicaPolicy)
+   {
+      this.replicaPolicy = replicaPolicy;
+   }
+
+   /*
+   * these 2 methods are the same, leaving both as the second is correct but the first is needed until more refactoring is done
+   * */
+   public String getBackupGroupName()
+   {
+      return groupName;
+   }
+
+   public String getGroupName()
+   {
+      return groupName;
+   }
+
+   @Override
+   public String getScaleDownGroupName()
+   {
+      return null;
+   }
+
+   public void setGroupName(String groupName)
+   {
+      this.groupName = groupName;
+   }
+
+   @Override
+   public boolean isSharedStore()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean isBackup()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean canScaleDown()
+   {
+      return false;
+   }
+
+   @Override
+   public String getScaleDownClustername()
+   {
+      return null;
+   }
+
+   @Override
+   public LiveActivation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO)
+   {
+      return new SharedNothingLiveActivation(server, this);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java
new file mode 100644
index 0000000..a5fee77
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQMessageBundle;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServerLogger;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ScaleDownPolicy
+{
+   private List<String> connectors = new ArrayList<>();
+
+   private String discoveryGroup = null;
+
+   private String groupName = null;
+
+   private String clusterName;
+
+   private boolean enabled;
+
+   public ScaleDownPolicy()
+   {
+   }
+
+   public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled)
+   {
+      this.connectors = connectors;
+      this.groupName = groupName;
+      this.clusterName = clusterName;
+      this.enabled = enabled;
+   }
+
+   public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled)
+   {
+      this.discoveryGroup = discoveryGroup;
+      this.groupName = groupName;
+      this.clusterName = clusterName;
+      this.enabled = enabled;
+   }
+
+
+   public List<String> getConnectors()
+   {
+      return connectors;
+   }
+
+   public void setConnectors(List<String> connectors)
+   {
+      this.connectors = connectors;
+   }
+
+   public String getDiscoveryGroup()
+   {
+      return discoveryGroup;
+   }
+
+   public void setDiscoveryGroup(String discoveryGroup)
+   {
+      this.discoveryGroup = discoveryGroup;
+   }
+
+   public String getGroupName()
+   {
+      return groupName;
+   }
+
+   public void setGroupName(String groupName)
+   {
+      this.groupName = groupName;
+   }
+
+   public String getClusterName()
+   {
+      return clusterName;
+   }
+
+   public void setClusterName(String clusterName)
+   {
+      this.clusterName = clusterName;
+   }
+
+   public boolean isEnabled()
+   {
+      return enabled;
+   }
+
+   public void setEnabled(boolean enabled)
+   {
+      this.enabled = enabled;
+   }
+
+   public static ServerLocatorInternal getScaleDownConnector(ScaleDownPolicy scaleDownPolicy, HornetQServer hornetQServer) throws HornetQException
+   {
+      if (!scaleDownPolicy.getConnectors().isEmpty())
+      {
+         return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(connectorNameListToArray(scaleDownPolicy.getConnectors(), hornetQServer));
+      }
+      else if (scaleDownPolicy.getDiscoveryGroup() != null)
+      {
+         DiscoveryGroupConfiguration dg = hornetQServer.getConfiguration().getDiscoveryGroupConfigurations().get(scaleDownPolicy.getDiscoveryGroup());
+
+         if (dg == null)
+         {
+            throw HornetQMessageBundle.BUNDLE.noDiscoveryGroupFound(dg);
+         }
+         return  (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+      }
+      else
+      {
+         Map<String, TransportConfiguration> connectorConfigurations = hornetQServer.getConfiguration().getConnectorConfigurations();
+         for (TransportConfiguration transportConfiguration : connectorConfigurations.values())
+         {
+            if (transportConfiguration.getFactoryClassName().equals(InVMConnectorFactory.class.getName()))
+            {
+               return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(transportConfiguration);
+            }
+         }
+      }
+      throw HornetQMessageBundle.BUNDLE.noConfigurationFoundForScaleDown();
+   }
+
+   private static TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames, HornetQServer hornetQServer)
+   {
+      TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class,
+            connectorNames.size());
+      int count = 0;
+      for (String connectorName : connectorNames)
+      {
+         TransportConfiguration connector = hornetQServer.getConfiguration().getConnectorConfigurations().get(connectorName);
+
+         if (connector == null)
+         {
+            HornetQServerLogger.LOGGER.bridgeNoConnector(connectorName);
+
+            return null;
+         }
+
+         tcConfigs[count++] = connector;
+      }
+
+      return tcConfigs;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java
new file mode 100644
index 0000000..5aa848f
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.config.HornetQDefaultConfiguration;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.LiveActivation;
+import org.hornetq.core.server.impl.SharedStoreLiveActivation;
+
+import java.util.Map;
+
+public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation>
+{
+   private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay();
+
+   private boolean failoverOnServerShutdown = HornetQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
+
+   private SharedStoreSlavePolicy sharedStoreSlavePolicy;
+
+   public SharedStoreMasterPolicy()
+   {
+   }
+
+   public SharedStoreMasterPolicy(long failbackDelay, boolean failoverOnServerShutdown)
+   {
+      this.failbackDelay = failbackDelay;
+      this.failoverOnServerShutdown = failoverOnServerShutdown;
+   }
+
+   public long getFailbackDelay()
+   {
+      return failbackDelay;
+   }
+
+   public void setFailbackDelay(long failbackDelay)
+   {
+      this.failbackDelay = failbackDelay;
+   }
+
+   public boolean isFailoverOnServerShutdown()
+   {
+      return failoverOnServerShutdown;
+   }
+
+   public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown)
+   {
+      this.failoverOnServerShutdown = failoverOnServerShutdown;
+   }
+
+   public SharedStoreSlavePolicy getSharedStoreSlavePolicy()
+   {
+      return sharedStoreSlavePolicy;
+   }
+
+   public void setSharedStoreSlavePolicy(SharedStoreSlavePolicy sharedStoreSlavePolicy)
+   {
+      this.sharedStoreSlavePolicy = sharedStoreSlavePolicy;
+   }
+
+   @Override
+   public boolean isSharedStore()
+   {
+      return true;
+   }
+
+   @Override
+   public boolean isBackup()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean canScaleDown()
+   {
+      return false;
+   }
+
+   @Override
+   public LiveActivation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO)
+   {
+      return  new SharedStoreLiveActivation(server, this);
+   }
+
+   @Override
+   public String getBackupGroupName()
+   {
+      return null;
+   }
+
+   @Override
+   public String getScaleDownGroupName()
+   {
+      return null;
+   }
+
+   @Override
+   public String getScaleDownClustername()
+   {
+      return null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java
new file mode 100644
index 0000000..df6890f
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.config.HornetQDefaultConfiguration;
+import org.hornetq.core.server.impl.Activation;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.SharedStoreBackupActivation;
+
+import java.util.Map;
+
+public class SharedStoreSlavePolicy extends BackupPolicy
+{
+   private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay();
+
+   private boolean failoverOnServerShutdown = HornetQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
+
+   private boolean allowAutoFailBack = HornetQDefaultConfiguration.isDefaultAllowAutoFailback();
+
+   //this is how we act once we have failed over
+   private SharedStoreMasterPolicy sharedStoreMasterPolicy;
+
+   public SharedStoreSlavePolicy()
+   {
+   }
+
+   public SharedStoreSlavePolicy(long failbackDelay, boolean failoverOnServerShutdown, boolean restartBackup, boolean allowAutoFailBack, ScaleDownPolicy scaleDownPolicy)
+   {
+      this.failbackDelay = failbackDelay;
+      this.failoverOnServerShutdown = failoverOnServerShutdown;
+      this.restartBackup = restartBackup;
+      this.allowAutoFailBack = allowAutoFailBack;
+      this.scaleDownPolicy = scaleDownPolicy;
+      sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failbackDelay, failoverOnServerShutdown);
+   }
+
+   public long getFailbackDelay()
+   {
+      return failbackDelay;
+   }
+
+   public void setFailbackDelay(long failbackDelay)
+   {
+      this.failbackDelay = failbackDelay;
+   }
+
+   public boolean isFailoverOnServerShutdown()
+   {
+      return failoverOnServerShutdown;
+   }
+
+   public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown)
+   {
+      this.failoverOnServerShutdown = failoverOnServerShutdown;
+   }
+
+   public SharedStoreMasterPolicy getSharedStoreMasterPolicy()
+   {
+      return sharedStoreMasterPolicy;
+   }
+
+   public void setSharedStoreMasterPolicy(SharedStoreMasterPolicy sharedStoreMasterPolicy)
+   {
+      this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
+   }
+
+   @Override
+   public boolean isSharedStore()
+   {
+      return true;
+   }
+
+   @Override
+   public boolean canScaleDown()
+   {
+      return scaleDownPolicy != null;
+   }
+
+   public boolean isAllowAutoFailBack()
+   {
+      return allowAutoFailBack;
+   }
+
+   public void setAllowAutoFailBack(boolean allowAutoFailBack)
+   {
+      this.allowAutoFailBack = allowAutoFailBack;
+   }
+
+   @Override
+   public Activation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO)
+   {
+      return new SharedStoreBackupActivation(server, this);
+   }
+
+   @Override
+   public String getBackupGroupName()
+   {
+      return null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java
new file mode 100644
index 0000000..52934cd
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.core.server.HornetQServer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/*
+* this implementation doesn't really do anything at the minute but this may change so Im leaving it here, Andy...
+* */
+public class StandaloneHAManager implements HAManager
+{
+   Map<String, HornetQServer> servers = new HashMap<>();
+
+   boolean isStarted = false;
+
+   @Override
+   public Map<String, HornetQServer> getBackupServers()
+   {
+      return servers;
+   }
+
+   @Override
+   public void start() throws Exception
+   {
+      if (isStarted)
+         return;
+      isStarted = true;
+   }
+
+   @Override
+   public void stop() throws Exception
+   {
+      if (!isStarted)
+         return;
+      isStarted = false;
+   }
+
+   @Override
+   public boolean isStarted()
+   {
+      return isStarted;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
index 5e8fc75..4b49bbf 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
@@ -33,7 +33,7 @@ import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.api.core.client.TopologyMember;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
@@ -262,7 +262,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), name);
-         Notification notification = new Notification(nodeUUID.toString(), NotificationType.BRIDGE_STARTED, props);
+         Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STARTED, props);
          notificationService.sendNotification(notification);
       }
    }
@@ -388,7 +388,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), name);
-         Notification notification = new Notification(nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, props);
+         Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props);
          try
          {
             notificationService.sendNotification(notification);
@@ -413,7 +413,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), name);
-         Notification notification = new Notification(nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, props);
+         Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props);
          try
          {
             notificationService.sendNotification(notification);
@@ -672,6 +672,12 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             }
          }
       }
+      else if (scaleDownTargetNodeID != null)
+      {
+         // the disconnected node is scaling down to me, no need to reconnect to it
+         HornetQServerLogger.LOGGER.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect.");
+         fail(true);
+      }
       else
       {
          HornetQServerLogger.LOGGER.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
index ab50cf8..28ce428 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
@@ -24,7 +24,7 @@ import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.cluster.BroadcastGroup;
@@ -112,7 +112,7 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
-         Notification notification = new Notification(nodeManager.getNodeId().toString(), NotificationType.BROADCAST_GROUP_STARTED, props);
+         Notification notification = new Notification(nodeManager.getNodeId().toString(), CoreNotificationType.BROADCAST_GROUP_STARTED, props);
          notificationService.sendNotification(notification);
       }
 
@@ -146,7 +146,7 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
-         Notification notification = new Notification(nodeManager.getNodeId().toString(), NotificationType.BROADCAST_GROUP_STOPPED, props);
+         Notification notification = new Notification(nodeManager.getNodeId().toString(), CoreNotificationType.BROADCAST_GROUP_STOPPED, props);
          try
          {
             notificationService.sendNotification(notification);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
index 80545e3..5b5eed1 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -12,7 +12,9 @@
  */
 package org.hornetq.core.server.cluster.impl;
 
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -24,8 +26,8 @@ import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.api.core.management.ResourceNames;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -38,6 +40,7 @@ import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory;
 import org.hornetq.core.server.cluster.MessageFlowRecord;
 import org.hornetq.core.server.cluster.Transformer;
 import org.hornetq.utils.UUID;
@@ -50,6 +53,7 @@ import org.hornetq.utils.UUIDGenerator;
  *
  * @author tim
  * @author Clebert Suconic
+ * @author <a href="mtaylor@redhat,com">Martyn Taylor</a>
  */
 public class ClusterConnectionBridge extends BridgeImpl
 {
@@ -135,9 +139,9 @@ public class ClusterConnectionBridge extends BridgeImpl
       // we need to disable DLQ check on the clustered bridges
       queue.setInternalQueue(true);
 
-      if (HornetQServerLogger.LOGGER.isDebugEnabled())
+      if (HornetQServerLogger.LOGGER.isTraceEnabled())
       {
-         HornetQServerLogger.LOGGER.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator,
+         HornetQServerLogger.LOGGER.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator,
                                           new Exception("trace"));
       }
    }
@@ -145,6 +149,7 @@ public class ClusterConnectionBridge extends BridgeImpl
    @Override
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {
+      serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
       setSessionFactory(factory);
 
@@ -247,28 +252,26 @@ public class ClusterConnectionBridge extends BridgeImpl
                                                    " AND " +
                                                    ManagementHelper.HDR_NOTIFICATION_TYPE +
                                                    " IN ('" +
-                                                   NotificationType.BINDING_ADDED +
+                                                   CoreNotificationType.BINDING_ADDED +
                                                    "','" +
-                                                   NotificationType.BINDING_REMOVED +
+                                                   CoreNotificationType.BINDING_REMOVED +
                                                    "','" +
-                                                   NotificationType.CONSUMER_CREATED +
+                                                   CoreNotificationType.CONSUMER_CREATED +
                                                    "','" +
-                                                   NotificationType.CONSUMER_CLOSED +
+                                                   CoreNotificationType.CONSUMER_CLOSED +
                                                    "','" +
-                                                   NotificationType.PROPOSAL +
+                                                   CoreNotificationType.PROPOSAL +
                                                    "','" +
-                                                   NotificationType.PROPOSAL_RESPONSE +
+                                                   CoreNotificationType.PROPOSAL_RESPONSE +
                                                    "','" +
-                                                   NotificationType.UNPROPOSAL +
+                                                   CoreNotificationType.UNPROPOSAL +
                                                    "') AND " +
                                                    ManagementHelper.HDR_DISTANCE +
                                                    "<" +
                                                    flowRecord.getMaxHops() +
                                                    " AND (" +
-                                                   ManagementHelper.HDR_ADDRESS +
-                                                   " LIKE '" +
-                                                   flowRecord.getAddress() +
-                                                   "%')");
+                                                   createSelectorFromAddress(flowRecord.getAddress()) +
+                                                   ")");
 
          session.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter);
 
@@ -279,8 +282,10 @@ public class ClusterConnectionBridge extends BridgeImpl
          session.start();
 
          ClientMessage message = session.createMessage(false);
-
-         HornetQServerLogger.LOGGER.debug("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
+         if (HornetQServerLogger.LOGGER.isTraceEnabled())
+         {
+            HornetQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
+         }
          ManagementHelper.putOperationInvocation(message,
                                                  ResourceNames.CORE_SERVER,
                                                  "sendQueueInfoToQueue",
@@ -298,6 +303,79 @@ public class ClusterConnectionBridge extends BridgeImpl
       }
    }
 
+   /**
+    * Takes in a string of an address filter or comma separated list and generates an appropriate JMS selector for
+    * filtering queues.
+    * @param address
+    */
+   public static String createSelectorFromAddress(String address)
+   {
+      StringBuilder stringBuilder = new StringBuilder();
+
+      // Support standard address (not a list) case.
+      if (!address.contains(","))
+      {
+         if (address.startsWith("!"))
+         {
+            stringBuilder.append(ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + address.substring(1, address.length()) + "%'");
+         }
+         else
+         {
+            stringBuilder.append(ManagementHelper.HDR_ADDRESS +  " LIKE '" + address + "%'");
+         }
+         return stringBuilder.toString();
+      }
+
+      // For comma separated lists build a JMS selector statement based on the list items
+      return buildSelectorFromArray(address.split(","));
+   }
+
+   public static String buildSelectorFromArray(String[] list)
+   {
+      List<String> includes = new ArrayList<String>();
+      List<String> excludes = new ArrayList<String>();
+
+      // Split the list into addresses to match and addresses to exclude.
+      for (int i = 0; i < list.length; i++)
+      {
+         if (list[i].startsWith("!"))
+         {
+            excludes.add(list[i].substring(1, list[i].length()));
+         }
+         else
+         {
+            includes.add(list[i]);
+         }
+      }
+
+      // Build the address matching part of the selector
+      StringBuilder builder = new StringBuilder("(");
+      if (includes.size() > 0)
+      {
+         if (excludes.size() > 0) builder.append("(");
+         for (int i = 0; i < includes.size(); i++)
+         {
+            builder.append("(" + ManagementHelper.HDR_ADDRESS + " LIKE '" + includes.get(i) + "%')");
+            if (i < includes.size() - 1) builder.append(" OR ");
+         }
+         if (excludes.size() > 0) builder.append(")");
+      }
+
+      // Build the address exclusion part of the selector
+      if (excludes.size() > 0)
+      {
+         if (includes.size() > 0) builder.append(" AND (");
+         for (int i = 0; i < excludes.size(); i++)
+         {
+            builder.append("(" + ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + excludes.get(i) + "%')");
+            if (i < excludes.size() - 1) builder.append(" AND ");
+         }
+         if (includes.size() > 0) builder.append(")");
+      }
+      builder.append(")");
+      return builder.toString();
+   }
+
    @Override
    protected void afterConnect() throws Exception
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
index 60f1058..3cec130 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -33,8 +33,8 @@ import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.TopologyMember;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.AfterConnectInternalListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
@@ -45,7 +45,6 @@ import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
-import org.hornetq.core.protocol.ServerPacketDecoder;
 import org.hornetq.core.server.HornetQMessageBundle;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
@@ -56,6 +55,7 @@ import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.ClusterControl;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.ClusterManager.IncomingInterceptorLookingForExceptionMessage;
+import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory;
 import org.hornetq.core.server.cluster.MessageFlowRecord;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
 import org.hornetq.core.server.group.impl.Proposal;
@@ -67,9 +67,6 @@ import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.FutureLatch;
 import org.hornetq.utils.TypedProperties;
 
-import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
-import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-
 /**
  * A ClusterConnectionImpl
  *
@@ -437,7 +434,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), name);
          Notification notification = new Notification(nodeManager.getNodeId().toString(),
-                                                      NotificationType.CLUSTER_CONNECTION_STOPPED,
+                                                      CoreNotificationType.CLUSTER_CONNECTION_STOPPED,
                                                       props);
          managementService.sendNotification(notification);
       }
@@ -652,7 +649,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
          serverLocator.setAfterConnectionInternalListener(this);
 
-         serverLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE);
+         serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
 
          serverLocator.start(server.getExecutorFactory().getExecutor());
       }
@@ -662,7 +659,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), name);
          Notification notification = new Notification(nodeManager.getNodeId().toString(),
-                                                      NotificationType.CLUSTER_CONNECTION_STARTED,
+                                                      CoreNotificationType.CLUSTER_CONNECTION_STARTED,
                                                       props);
          HornetQServerLogger.LOGGER.debug("sending notification: " + notification);
          managementService.sendNotification(notification);
@@ -845,7 +842,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
       targetLocator.setAfterConnectionInternalListener(this);
 
-      targetLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE);
+      serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
 
       targetLocator.setNodeID(nodeId);
 
@@ -1133,7 +1130,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
          // a list of integers
          SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE);
 
-         NotificationType ntype = NotificationType.valueOf(type.toString());
+         CoreNotificationType ntype = CoreNotificationType.valueOf(type.toString());
 
          switch (ntype)
          {
@@ -1366,7 +1363,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
             return;
          }
 
-         RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
+         RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(),
                                                                  queueAddress,
                                                                  clusterName,
                                                                  routingName,
@@ -1503,7 +1500,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
             props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
          }
 
-         Notification notification = new Notification(null, CONSUMER_CREATED, props);
+         Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props);
 
          managementService.sendNotification(notification);
       }
@@ -1560,7 +1557,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
          {
             props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
          }
-         Notification notification = new Notification(null, CONSUMER_CLOSED, props);
+         Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CLOSED, props);
 
          managementService.sendNotification(notification);
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java
index 88441a4..e5956bd 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java
@@ -15,7 +15,7 @@ package org.hornetq.core.server.cluster.qourum;
 import org.hornetq.core.client.impl.Topology;
 
 /**
- * A quorum can be registered with the @link QuorumManager to receive notifications about the state of a cluster.
+ * A quorum can be registered with the {@link QuorumManager} to receive notifications about the state of a cluster.
  * It can then use the {@link QuorumManager} for the quorum within a cluster to vote on a specific outcome.
  * */
 public interface Quorum


Mime
View raw message