activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [17/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:47 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java
new file mode 100644
index 0000000..ea2a976
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingBackupActivation.java
@@ -0,0 +1,498 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.HornetQInternalErrorException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.TopologyMember;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
+import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.server.ActivationParams;
+import org.hornetq.core.server.HornetQMessageBundle;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.LiveNodeLocator;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.cluster.ClusterControl;
+import org.hornetq.core.server.cluster.ClusterController;
+import org.hornetq.core.server.cluster.ha.ReplicaPolicy;
+import org.hornetq.core.server.cluster.ha.ScaleDownPolicy;
+import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.utils.ReusableLatch;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING;
+import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAIL_OVER;
+import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.STOP;
+
+public final class SharedNothingBackupActivation extends Activation
+{
+   //this is how we act when we start as a backup
+   private ReplicaPolicy replicaPolicy;
+
+   //this is the endpoint where we replicate too
+   private ReplicationEndpoint replicationEndpoint;
+
+   private final HornetQServerImpl hornetQServer;
+   private SharedNothingBackupQuorum backupQuorum;
+   private final boolean attemptFailBack;
+   private final Map<String, Object> activationParams;
+   private final HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO;
+   private String nodeID;
+   ClusterControl clusterControl;
+   private boolean closed;
+   private volatile boolean backupUpToDate = true;
+
+   private final ReusableLatch backupSyncLatch = new ReusableLatch(0);
+
+   public SharedNothingBackupActivation(HornetQServerImpl hornetQServer,
+                                        boolean attemptFailBack,
+                                        Map<String, Object> activationParams,
+                                        HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO,
+                                        ReplicaPolicy replicaPolicy)
+   {
+      this.hornetQServer = hornetQServer;
+      this.attemptFailBack = attemptFailBack;
+      this.activationParams = activationParams;
+      this.shutdownOnCriticalIO = shutdownOnCriticalIO;
+      this.replicaPolicy = replicaPolicy;
+      backupSyncLatch.setCount(1);
+   }
+
+   public void init() throws Exception
+   {
+      assert replicationEndpoint == null;
+      hornetQServer.resetNodeManager();
+      backupUpToDate = false;
+      replicationEndpoint = new ReplicationEndpoint(hornetQServer, shutdownOnCriticalIO, attemptFailBack, this);
+   }
+
+   public void run()
+   {
+      try
+      {
+         synchronized (hornetQServer)
+         {
+            hornetQServer.setState(HornetQServerImpl.SERVER_STATE.STARTED);
+         }
+         // move all data away:
+         hornetQServer.getNodeManager().stop();
+         hornetQServer.moveServerData();
+         hornetQServer.getNodeManager().start();
+         synchronized (this)
+         {
+            if (closed)
+               return;
+         }
+
+         boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();
+
+         if (!hornetQServer.initialisePart1(scalingDown))
+            return;
+
+         synchronized (this)
+         {
+            if (closed)
+               return;
+            backupQuorum = new SharedNothingBackupQuorum(hornetQServer.getStorageManager(), hornetQServer.getNodeManager(), hornetQServer.getScheduledPool());
+            hornetQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
+         }
+
+         //use a Node Locator to connect to the cluster
+         LiveNodeLocator nodeLocator;
+         if (activationParams.get(ActivationParams.REPLICATION_ENDPOINT) != null)
+         {
+            TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT);
+            nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup()));
+         }
+         else
+         {
+            nodeLocator = replicaPolicy.getGroupName() == null ?
+                  new AnyLiveNodeLocatorForReplication(backupQuorum, hornetQServer) :
+                  new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum);
+         }
+         ClusterController clusterController = hornetQServer.getClusterManager().getClusterController();
+         clusterController.addClusterTopologyListenerForReplication(nodeLocator);
+         //todo do we actually need to wait?
+         clusterController.awaitConnectionToReplicationCluster();
+
+         clusterController.addIncomingInterceptorForReplication(new ReplicationError(hornetQServer, nodeLocator));
+
+         // nodeManager.startBackup();
+
+         hornetQServer.getBackupManager().start();
+
+         replicationEndpoint.setBackupQuorum(backupQuorum);
+         replicationEndpoint.setExecutor(hornetQServer.getExecutorFactory().getExecutor());
+         EndpointConnector endpointConnector = new EndpointConnector();
+
+         HornetQServerLogger.LOGGER.backupServerStarted(hornetQServer.getVersion().getFullVersion(), hornetQServer.getNodeManager().getNodeId());
+         hornetQServer.setState(HornetQServerImpl.SERVER_STATE.STARTED);
+
+         SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
+         do
+         {
+            //locate the first live server to try to replicate
+            nodeLocator.locateNode();
+            if (closed)
+            {
+               return;
+            }
+            Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();
+            nodeID = nodeLocator.getNodeID();
+            //in a normal (non failback) scenario if we couldn't find our live server we should fail
+            if (!attemptFailBack)
+            {
+               //this shouldn't happen
+               if (nodeID == null)
+                  throw new RuntimeException("Could not establish the connection");
+               hornetQServer.getNodeManager().setNodeID(nodeID);
+            }
+
+            try
+            {
+               clusterControl =  clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
+            }
+            catch (Exception e)
+            {
+               if (possibleLive.getB() != null)
+               {
+                  try
+                  {
+                     clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB());
+                  }
+                  catch (Exception e1)
+                  {
+                     clusterControl = null;
+                  }
+               }
+            }
+            if (clusterControl == null)
+            {
+               //its ok to retry here since we haven't started replication yet
+               //it may just be the server has gone since discovery
+               Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
+               signal = SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;
+               continue;
+            }
+
+            hornetQServer.getThreadPool().execute(endpointConnector);
+            /**
+             * Wait for a signal from the the quorum manager, at this point if replication has been successful we can
+             * fail over or if there is an error trying to replicate (such as already replicating) we try the
+             * process again on the next live server.  All the action happens inside {@link BackupQuorum}
+             */
+            signal = backupQuorum.waitForStatusChange();
+            /**
+             * replicationEndpoint will be holding lots of open files. Make sure they get
+             * closed/sync'ed.
+             */
+            HornetQServerImpl.stopComponent(replicationEndpoint);
+            // time to give up
+            if (!hornetQServer.isStarted() || signal == STOP)
+               return;
+               // time to fail over
+            else if (signal == FAIL_OVER)
+               break;
+               // something has gone badly run restart from scratch
+            else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING)
+            {
+               Thread startThread = new Thread(new Runnable()
+               {
+                  @Override
+                  public void run()
+                  {
+                     try
+                     {
+                        hornetQServer.stop();
+                     }
+                     catch (Exception e)
+                     {
+                        HornetQServerLogger.LOGGER.errorRestartingBackupServer(e, hornetQServer);
+                     }
+                  }
+               });
+               startThread.start();
+               return;
+            }
+            //ok, this live is no good, let's reset and try again
+            //close this session factory, we're done with it
+            clusterControl.close();
+            backupQuorum.reset();
+            if (replicationEndpoint.getChannel() != null)
+            {
+               replicationEndpoint.getChannel().close();
+               replicationEndpoint.setChannel(null);
+            }
+         }
+         while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
+
+         hornetQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum);
+
+         if (!isRemoteBackupUpToDate())
+         {
+            throw HornetQMessageBundle.BUNDLE.backupServerNotInSync();
+         }
+
+         replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
+         hornetQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy());
+         synchronized (hornetQServer)
+         {
+            if (!hornetQServer.isStarted())
+               return;
+            HornetQServerLogger.LOGGER.becomingLive(hornetQServer);
+            hornetQServer.getNodeManager().stopBackup();
+            hornetQServer.getStorageManager().start();
+            hornetQServer.getBackupManager().activated();
+            if (scalingDown)
+            {
+               hornetQServer.initialisePart2(true);
+            }
+            else
+            {
+               hornetQServer.setActivation(new SharedNothingLiveActivation(hornetQServer, replicaPolicy.getReplicatedPolicy()));
+               hornetQServer.initialisePart2(false);
+
+               if (hornetQServer.getIdentity() != null)
+               {
+                  HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity());
+               }
+               else
+               {
+                  HornetQServerLogger.LOGGER.serverIsLive();
+               }
+
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !hornetQServer.isStarted())
+            // do not log these errors if the server is being stopped.
+            return;
+         HornetQServerLogger.LOGGER.initializationError(e);
+         e.printStackTrace();
+      }
+   }
+
+   public void close(final boolean permanently, boolean restarting) throws Exception
+   {
+      synchronized (this)
+      {
+         if (backupQuorum != null)
+            backupQuorum.causeExit(STOP);
+         replicationEndpoint = null;
+         closed = true;
+      }
+      //we have to check as the server policy may have changed
+      if (hornetQServer.getHAPolicy().isBackup())
+      {
+         // To avoid a NPE cause by the stop
+         NodeManager nodeManagerInUse = hornetQServer.getNodeManager();
+
+         hornetQServer.interrupBackupThread(nodeManagerInUse);
+
+         if (nodeManagerInUse != null)
+         {
+            nodeManagerInUse.stopBackup();
+         }
+      }
+   }
+
+   @Override
+   public void preStorageClose() throws Exception
+   {
+      if (replicationEndpoint != null)
+      {
+         replicationEndpoint.stop();
+      }
+   }
+
+   @Override
+   public JournalLoader createJournalLoader(PostOffice postOffice, PagingManager pagingManager, StorageManager storageManager, QueueFactory queueFactory, NodeManager nodeManager, ManagementService managementService, GroupingHandler groupingHandler, Configuration configuration, HornetQServer parentServer) throws HornetQException
+   {
+      if (replicaPolicy.getScaleDownPolicy() != null)
+      {
+         return new BackupRecoveryJournalLoader(postOffice,
+               pagingManager,
+               storageManager,
+               queueFactory,
+               nodeManager,
+               managementService,
+               groupingHandler,
+               configuration,
+               parentServer,
+               ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), hornetQServer),
+               hornetQServer.getClusterManager().getClusterController());
+      }
+      else
+      {
+         return super.createJournalLoader(postOffice,
+               pagingManager,
+               storageManager,
+               queueFactory,
+               nodeManager,
+               managementService,
+               groupingHandler,
+               configuration,
+               parentServer);
+      }
+   }
+
+   @Override
+   public void haStarted()
+   {
+      hornetQServer.getClusterManager().getClusterController().setReplicatedClusterName(replicaPolicy.getClusterName());
+   }
+
+
+   /**
+    * Wait for backup synchronization when using synchronization
+    * @param timeout
+    * @param unit
+    * @see java.util.concurrent.CountDownLatch#await(long, TimeUnit)
+    * @return {@code true} if the server was already initialized or if it was initialized within the
+    *         timeout period, {@code false} otherwise.
+    * @throws InterruptedException
+    */
+   public boolean waitForBackupSync(long timeout, TimeUnit unit) throws InterruptedException
+   {
+      return backupSyncLatch.await(timeout, unit);
+   }
+
+   /**
+    * Live has notified this server that it is going to stop.
+    */
+   public void failOver(final ReplicationLiveIsStoppingMessage.LiveStopping finalMessage)
+   {
+      if (finalMessage == null)
+      {
+         backupQuorum.causeExit(FAILURE_REPLICATING);
+      }
+      else
+      {
+         backupQuorum.failOver(finalMessage);
+      }
+   }
+
+   public ReplicationEndpoint getReplicationEndpoint()
+   {
+      return replicationEndpoint;
+   }
+
+
+   /**
+    * Whether a remote backup server was in sync with its live server. If it was not in sync, it may
+    * not take over the live's functions.
+    * <p/>
+    * A local backup server or a live server should always return {@code true}
+    *
+    * @return whether the backup is up-to-date, if the server is not a backup it always returns
+    * {@code true}.
+    */
+   public boolean isRemoteBackupUpToDate()
+   {
+      return backupUpToDate;
+   }
+
+   public void setRemoteBackupUpToDate()
+   {
+      hornetQServer.getBackupManager().announceBackup();
+      backupUpToDate = true;
+      backupSyncLatch.countDown();
+   }
+
+   /**
+    * @throws org.hornetq.api.core.HornetQException
+    */
+   public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws HornetQException
+   {
+      HornetQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
+            backupUpToDate);
+      if (!hornetQServer.getHAPolicy().isBackup() || hornetQServer.getHAPolicy().isSharedStore())
+      {
+         throw new HornetQInternalErrorException();
+      }
+
+      if (!backupUpToDate)
+      {
+         failOver(null);
+      }
+      else
+      {
+         failOver(finalMessage);
+      }
+   }
+
+
+
+   private class EndpointConnector implements Runnable
+   {
+      @Override
+      public void run()
+      {
+         try
+         {
+            //we should only try once, if its not there we should move on.
+            clusterControl.getSessionFactory().setReconnectAttempts(1);
+            backupQuorum.setSessionFactory(clusterControl.getSessionFactory());
+            //get the connection and request replication to live
+            clusterControl.authorize();
+            connectToReplicationEndpoint(clusterControl);
+            replicationEndpoint.start();
+            clusterControl.announceReplicatingBackupToLive(attemptFailBack, replicaPolicy.getClusterName());
+         }
+         catch (Exception e)
+         {
+            //we shouldn't stop the server just mark the connector as tried and unavailable
+            HornetQServerLogger.LOGGER.replicationStartProblem(e);
+            backupQuorum.causeExit(FAILURE_REPLICATING);
+         }
+      }
+
+      private synchronized ReplicationEndpoint connectToReplicationEndpoint(final ClusterControl control) throws Exception
+      {
+         if (!hornetQServer.isStarted())
+            return null;
+         if (!hornetQServer.getHAPolicy().isBackup())
+         {
+            throw HornetQMessageBundle.BUNDLE.serverNotBackupServer();
+         }
+
+         Channel replicationChannel = control.createReplicationChannel();
+
+         replicationChannel.setHandler(replicationEndpoint);
+
+         if (replicationEndpoint.getChannel() != null)
+         {
+            throw HornetQMessageBundle.BUNDLE.alreadyHaveReplicationServer();
+         }
+
+         replicationEndpoint.setChannel(replicationChannel);
+
+         return replicationEndpoint;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java
new file mode 100644
index 0000000..27ad9d8
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedNothingLiveActivation.java
@@ -0,0 +1,486 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQAlreadyReplicatingException;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.HornetQIllegalStateException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.TopologyMember;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.ConfigurationUtils;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.server.HornetQMessageBundle;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ha.ReplicatedPolicy;
+import org.hornetq.spi.core.remoting.Acceptor;
+
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SharedNothingLiveActivation extends LiveActivation
+{
+   //this is how we act when we initially start as a live
+   private ReplicatedPolicy replicatedPolicy;
+
+   private HornetQServerImpl hornetQServer;
+
+   private ReplicationManager replicationManager;
+
+   private final Object replicationLock = new Object();
+
+   public SharedNothingLiveActivation(HornetQServerImpl hornetQServer,
+                                      ReplicatedPolicy replicatedPolicy)
+   {
+      this.hornetQServer = hornetQServer;
+      this.replicatedPolicy = replicatedPolicy;
+   }
+
+   @Override
+   public void freezeConnections(RemotingService remotingService)
+   {
+      ReplicationManager localReplicationManager = replicationManager;
+
+      if (remotingService != null && localReplicationManager != null)
+      {
+         remotingService.freeze(null, localReplicationManager.getBackupTransportConnection());
+      }
+      else if (remotingService != null)
+      {
+         remotingService.freeze(null, null);
+      }
+   }
+
+   public void run()
+   {
+      try
+      {
+         if (replicatedPolicy.isCheckForLiveServer() && isNodeIdUsed())
+         {
+            //set for when we failback
+            replicatedPolicy.getReplicaPolicy().setReplicatedPolicy(replicatedPolicy);
+            hornetQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
+            return;
+         }
+
+         hornetQServer.initialisePart1(false);
+
+         hornetQServer.initialisePart2(false);
+
+         if (hornetQServer.getIdentity() != null)
+         {
+            HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity());
+         }
+         else
+         {
+            HornetQServerLogger.LOGGER.serverIsLive();
+         }
+      }
+      catch (Exception e)
+      {
+         HornetQServerLogger.LOGGER.initializationError(e);
+      }
+   }
+
+   @Override
+   public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed)
+   {
+      return new ChannelHandler()
+      {
+         @Override
+         public void handlePacket(Packet packet)
+         {
+            if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
+            {
+               BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
+               ClusterConnection clusterConnection = acceptorUsed.getClusterConnection();
+               try
+               {
+                  startReplication(channel.getConnection(), clusterConnection, getPair(msg.getConnector(), true),
+                        msg.isFailBackRequest());
+               }
+               catch (HornetQAlreadyReplicatingException are)
+               {
+                  channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
+               }
+               catch (HornetQException e)
+               {
+                  channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
+               }
+            }
+         }
+      };
+   }
+
+   public void startReplication(CoreRemotingConnection rc, final ClusterConnection clusterConnection,
+                                final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean isFailBackRequest) throws HornetQException
+   {
+      if (replicationManager != null)
+      {
+         throw new HornetQAlreadyReplicatingException();
+      }
+
+      if (!hornetQServer.isStarted())
+      {
+         throw new HornetQIllegalStateException();
+      }
+
+      synchronized (replicationLock)
+      {
+
+         if (replicationManager != null)
+         {
+            throw new HornetQAlreadyReplicatingException();
+         }
+         ReplicationFailureListener listener = new ReplicationFailureListener();
+         rc.addCloseListener(listener);
+         rc.addFailureListener(listener);
+         replicationManager = new ReplicationManager(rc, hornetQServer.getExecutorFactory());
+         replicationManager.start();
+         Thread t = new Thread(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  hornetQServer.getStorageManager().startReplication(replicationManager, hornetQServer.getPagingManager(), hornetQServer.getNodeID().toString(),
+                        isFailBackRequest && replicatedPolicy.isAllowAutoFailBack());
+
+                  clusterConnection.nodeAnnounced(System.currentTimeMillis(), hornetQServer.getNodeID().toString(), replicatedPolicy.getGroupName(), replicatedPolicy.getScaleDownGroupName(), pair, true);
+
+                  //todo, check why this was set here
+                  //backupUpToDate = false;
+
+                  if (isFailBackRequest && replicatedPolicy.isAllowAutoFailBack())
+                  {
+                     BackupTopologyListener listener1 = new BackupTopologyListener(hornetQServer.getNodeID().toString());
+                     clusterConnection.addClusterTopologyListener(listener1);
+                     if (listener1.waitForBackup())
+                     {
+                        try
+                        {
+                           Thread.sleep(replicatedPolicy.getFailbackDelay());
+                        }
+                        catch (InterruptedException e)
+                        {
+                           //
+                        }
+                        //if we have to many backups kept or arent configured to restart just stop, otherwise restart as a backup
+                        if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && hornetQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0)
+                        {
+                           hornetQServer.stop(true);
+                           HornetQServerLogger.LOGGER.stopReplicatedBackupAfterFailback();
+                        }
+                        else
+                        {
+                           hornetQServer.stop(true);
+                           HornetQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
+                           hornetQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
+                           hornetQServer.start();
+                        }
+                     }
+                     else
+                     {
+                        HornetQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
+                     }
+                  }
+               }
+               catch (Exception e)
+               {
+                  if (hornetQServer.getState() == HornetQServerImpl.SERVER_STATE.STARTED)
+                  {
+                  /*
+                   * The reasoning here is that the exception was either caused by (1) the
+                   * (interaction with) the backup, or (2) by an IO Error at the storage. If (1), we
+                   * can swallow the exception and ignore the replication request. If (2) the live
+                   * will crash shortly.
+                   */
+                     HornetQServerLogger.LOGGER.errorStartingReplication(e);
+                  }
+                  try
+                  {
+                     HornetQServerImpl.stopComponent(replicationManager);
+                  }
+                  catch (Exception hqe)
+                  {
+                     HornetQServerLogger.LOGGER.errorStoppingReplication(hqe);
+                  }
+                  finally
+                  {
+                     synchronized (replicationLock)
+                     {
+                        replicationManager = null;
+                     }
+                  }
+               }
+            }
+         });
+
+         t.start();
+      }
+   }
+
+   private final class ReplicationFailureListener implements FailureListener, CloseListener
+   {
+
+      @Override
+      public void connectionFailed(HornetQException exception, boolean failedOver)
+      {
+         connectionClosed();
+      }
+
+      @Override
+      public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID)
+      {
+         connectionFailed(me, failedOver);
+      }
+
+      @Override
+      public void connectionClosed()
+      {
+         hornetQServer.getThreadPool().execute(new Runnable()
+         {
+            public void run()
+            {
+               synchronized (replicationLock)
+               {
+                  if (replicationManager != null)
+                  {
+                     hornetQServer.getStorageManager().stopReplication();
+                     replicationManager = null;
+                  }
+               }
+            }
+         });
+      }
+   }
+
+   private Pair<TransportConfiguration, TransportConfiguration> getPair(TransportConfiguration conn,
+                                                                        boolean isBackup)
+   {
+      if (isBackup)
+      {
+         return new Pair<>(null, conn);
+      }
+      return new Pair<>(conn, null);
+   }
+   /**
+    * Determines whether there is another server already running with this server's nodeID.
+    * <p/>
+    * This can happen in case of a successful fail-over followed by the live's restart
+    * (attempting a fail-back).
+    *
+    * @throws Exception
+    */
+   private boolean isNodeIdUsed() throws Exception
+   {
+      if (hornetQServer.getConfiguration().getClusterConfigurations().isEmpty())
+         return false;
+      SimpleString nodeId0;
+      try
+      {
+         nodeId0 = hornetQServer.getNodeManager().readNodeId();
+      }
+      catch (HornetQIllegalStateException e)
+      {
+         nodeId0 = null;
+      }
+
+      ServerLocatorInternal locator;
+
+      ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(hornetQServer.getConfiguration(), replicatedPolicy.getClusterName());
+
+      locator = getLocator(config);
+
+      ClientSessionFactoryInternal factory = null;
+
+      NodeIdListener listener = new NodeIdListener(nodeId0);
+
+      locator.addClusterTopologyListener(listener);
+      try
+      {
+         locator.setReconnectAttempts(0);
+         try
+         {
+            locator.addClusterTopologyListener(listener);
+            factory = locator.connectNoWarnings();
+         }
+         catch (Exception notConnected)
+         {
+            return false;
+         }
+
+         listener.latch.await(5, TimeUnit.SECONDS);
+
+         return listener.isNodePresent;
+      }
+      finally
+      {
+         if (factory != null)
+            factory.close();
+         if (locator != null)
+            locator.close();
+      }
+   }
+
+   public void close(boolean permanently, boolean restarting) throws Exception
+   {
+
+      replicationManager = null;
+      // To avoid a NPE cause by the stop
+      NodeManager nodeManagerInUse = hornetQServer.getNodeManager();
+
+      if (nodeManagerInUse != null)
+      {
+         //todo does this actually make any difference, we only set a different flag in the lock file which replication doesnt use
+         if (permanently)
+         {
+            nodeManagerInUse.crashLiveServer();
+         }
+         else
+         {
+            nodeManagerInUse.pauseLiveServer();
+         }
+      }
+   }
+
+   @Override
+   public void sendLiveIsStopping()
+   {
+      final ReplicationManager localReplicationManager = replicationManager;
+
+      if (localReplicationManager != null)
+      {
+         localReplicationManager.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED);
+         // Schedule for 10 seconds
+         // this pool gets a 'hard' shutdown, no need to manage the Future of this Runnable.
+         hornetQServer.getScheduledPool().schedule(new Runnable()
+         {
+            @Override
+            public void run()
+            {
+               localReplicationManager.clearReplicationTokens();
+            }
+         }, 30, TimeUnit.SECONDS);
+      }
+   }
+
+   public ReplicationManager getReplicationManager()
+   {
+      synchronized (replicationLock)
+      {
+         return replicationManager;
+      }
+   }
+
+   private ServerLocatorInternal getLocator(ClusterConnectionConfiguration config) throws HornetQException
+   {
+      ServerLocatorInternal locator;
+      if (config.getDiscoveryGroupName() != null)
+      {
+         DiscoveryGroupConfiguration dg = hornetQServer.getConfiguration().getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
+
+         if (dg == null)
+         {
+            throw HornetQMessageBundle.BUNDLE.noDiscoveryGroupFound(dg);
+         }
+         locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+      }
+      else
+      {
+         TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
+               : null;
+
+         locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
+      }
+      return locator;
+   }
+
+
+   static final class NodeIdListener implements ClusterTopologyListener
+   {
+      volatile boolean isNodePresent = false;
+
+      private final SimpleString nodeId;
+      private final CountDownLatch latch = new CountDownLatch(1);
+
+      public NodeIdListener(SimpleString nodeId)
+      {
+         this.nodeId = nodeId;
+      }
+
+      @Override
+      public void nodeUP(TopologyMember topologyMember, boolean last)
+      {
+         boolean isOurNodeId = nodeId != null && nodeId.toString().equals(topologyMember.getNodeId());
+         if (isOurNodeId)
+         {
+            isNodePresent = true;
+         }
+         if (isOurNodeId || last)
+         {
+            latch.countDown();
+         }
+      }
+
+      @Override
+      public void nodeDown(long eventUID, String nodeID)
+      {
+         // no-op
+      }
+   }
+
+   private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
+   {
+      TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class,
+            connectorNames.size());
+      int count = 0;
+      for (String connectorName : connectorNames)
+      {
+         TransportConfiguration connector = hornetQServer.getConfiguration().getConnectorConfigurations().get(connectorName);
+
+         if (connector == null)
+         {
+            HornetQServerLogger.LOGGER.bridgeNoConnector(connectorName);
+
+            return null;
+         }
+
+         tcConfigs[count++] = connector;
+      }
+
+      return tcConfigs;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java
new file mode 100644
index 0000000..6c27e12
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreBackupActivation.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.cluster.ha.ScaleDownPolicy;
+import org.hornetq.core.server.cluster.ha.SharedStoreSlavePolicy;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.management.ManagementService;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.TimeUnit;
+
+public final class SharedStoreBackupActivation extends Activation
+{
+   //this is how we act as a backup
+   private SharedStoreSlavePolicy sharedStoreSlavePolicy;
+
+   private HornetQServerImpl hornetQServer;
+
+   private final Object failbackCheckerGuard = new Object();
+
+   private boolean cancelFailBackChecker;
+
+   public SharedStoreBackupActivation(HornetQServerImpl server, SharedStoreSlavePolicy sharedStoreSlavePolicy)
+   {
+      this.hornetQServer = server;
+      this.sharedStoreSlavePolicy = sharedStoreSlavePolicy;
+      synchronized (failbackCheckerGuard)
+      {
+         cancelFailBackChecker = false;
+      }
+   }
+
+   public void run()
+   {
+      try
+      {
+         hornetQServer.getNodeManager().startBackup();
+
+         boolean scalingDown = sharedStoreSlavePolicy.getScaleDownPolicy() != null;
+
+         if (!hornetQServer.initialisePart1(scalingDown))
+            return;
+
+         hornetQServer.getBackupManager().start();
+
+         hornetQServer.setState(HornetQServerImpl.SERVER_STATE.STARTED);
+
+         HornetQServerLogger.LOGGER.backupServerStarted(hornetQServer.getVersion().getFullVersion(), hornetQServer.getNodeManager().getNodeId());
+
+         hornetQServer.getNodeManager().awaitLiveNode();
+
+         sharedStoreSlavePolicy.getSharedStoreMasterPolicy().setSharedStoreSlavePolicy(sharedStoreSlavePolicy);
+
+         hornetQServer.setHAPolicy(sharedStoreSlavePolicy.getSharedStoreMasterPolicy());
+
+         //hornetQServer.configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE);
+
+         hornetQServer.getBackupManager().activated();
+         if (hornetQServer.getState() != HornetQServerImpl.SERVER_STATE.STARTED)
+         {
+            return;
+         }
+
+         hornetQServer.initialisePart2(scalingDown);
+
+         if (scalingDown)
+         {
+            HornetQServerLogger.LOGGER.backupServerScaledDown();
+            Thread t = new Thread(new Runnable()
+            {
+               @Override
+               public void run()
+               {
+                  try
+                  {
+                     hornetQServer.stop();
+                     //we are shared store but if we were started by a parent server then we shouldn't restart
+                     if (sharedStoreSlavePolicy.isRestartBackup())
+                     {
+                        hornetQServer.start();
+                     }
+                  }
+                  catch (Exception e)
+                  {
+                     HornetQServerLogger.LOGGER.serverRestartWarning();
+                  }
+               }
+            });
+            t.start();
+            return;
+         }
+         else
+         {
+            HornetQServerLogger.LOGGER.backupServerIsLive();
+
+            hornetQServer.getNodeManager().releaseBackup();
+         }
+         if (sharedStoreSlavePolicy.isAllowAutoFailBack())
+         {
+            startFailbackChecker();
+         }
+      }
+      catch (InterruptedException e)
+      {
+         // this is ok, we are being stopped
+      }
+      catch (ClosedChannelException e)
+      {
+         // this is ok too, we are being stopped
+      }
+      catch (Exception e)
+      {
+         if (!(e.getCause() instanceof InterruptedException))
+         {
+            HornetQServerLogger.LOGGER.initializationError(e);
+         }
+      }
+      catch (Throwable e)
+      {
+         HornetQServerLogger.LOGGER.initializationError(e);
+      }
+   }
+
+   public void close(boolean permanently, boolean restarting) throws Exception
+   {
+      if (!restarting)
+      {
+         synchronized (failbackCheckerGuard)
+         {
+            cancelFailBackChecker = true;
+         }
+      }
+      // To avoid a NPE cause by the stop
+      NodeManager nodeManagerInUse = hornetQServer.getNodeManager();
+
+      //we need to check as the servers policy may have changed
+      if (hornetQServer.getHAPolicy().isBackup())
+      {
+
+         hornetQServer.interrupBackupThread(nodeManagerInUse);
+
+
+         if (nodeManagerInUse != null)
+         {
+            nodeManagerInUse.stopBackup();
+         }
+      }
+      else
+      {
+
+         if (nodeManagerInUse != null)
+         {
+            // if we are now live, behave as live
+            // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
+            // started before the live
+            if (sharedStoreSlavePolicy.isFailoverOnServerShutdown() || permanently)
+            {
+               nodeManagerInUse.crashLiveServer();
+            }
+            else
+            {
+               nodeManagerInUse.pauseLiveServer();
+            }
+         }
+      }
+   }
+
+   @Override
+   public JournalLoader createJournalLoader(PostOffice postOffice, PagingManager pagingManager, StorageManager storageManager, QueueFactory queueFactory, NodeManager nodeManager, ManagementService managementService, GroupingHandler groupingHandler, Configuration configuration, HornetQServer parentServer) throws HornetQException
+   {
+      if (sharedStoreSlavePolicy.getScaleDownPolicy() != null)
+      {
+         return new BackupRecoveryJournalLoader(postOffice,
+               pagingManager,
+               storageManager,
+               queueFactory,
+               nodeManager,
+               managementService,
+               groupingHandler,
+               configuration,
+               parentServer,
+               ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), hornetQServer),
+               hornetQServer.getClusterManager().getClusterController());
+      }
+      else
+      {
+         return super.createJournalLoader(postOffice,
+               pagingManager,
+               storageManager,
+               queueFactory,
+               nodeManager,
+               managementService,
+               groupingHandler,
+               configuration,
+               parentServer);
+      }
+   }
+
+   /**
+    * To be called by backup trying to fail back the server
+    */
+   private void startFailbackChecker()
+   {
+      hornetQServer.getScheduledPool().scheduleAtFixedRate(new FailbackChecker(), 1000L, 1000L, TimeUnit.MILLISECONDS);
+   }
+   private class FailbackChecker implements Runnable
+   {
+      private boolean restarting = false;
+
+      public void run()
+      {
+         try
+         {
+            if (!restarting && hornetQServer.getNodeManager().isAwaitingFailback())
+            {
+               HornetQServerLogger.LOGGER.awaitFailBack();
+               restarting = true;
+               Thread t = new Thread(new Runnable()
+               {
+                  public void run()
+                  {
+                     try
+                     {
+                        HornetQServerLogger.LOGGER.debug(hornetQServer + "::Stopping live node in favor of failback");
+
+                        hornetQServer.stop(true, false, true);
+                        // We need to wait some time before we start the backup again
+                        // otherwise we may eventually start before the live had a chance to get it
+                        Thread.sleep(sharedStoreSlavePolicy.getFailbackDelay());
+                        synchronized (failbackCheckerGuard)
+                        {
+                           if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
+                              return;
+
+                           hornetQServer.setHAPolicy(sharedStoreSlavePolicy);
+                           HornetQServerLogger.LOGGER.debug(hornetQServer +
+                                 "::Starting backup node now after failback");
+                           hornetQServer.start();
+                        }
+                     }
+                     catch (Exception e)
+                     {
+                        HornetQServerLogger.LOGGER.serverRestartWarning();
+                     }
+                  }
+               });
+               t.start();
+            }
+         }
+         catch (Exception e)
+         {
+            HornetQServerLogger.LOGGER.serverRestartWarning(e);
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java
new file mode 100644
index 0000000..4f8baf2
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/SharedStoreLiveActivation.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.cluster.ha.SharedStoreMasterPolicy;
+
+/**
+* Created by andy on 04/09/14.
+*/
+public final class SharedStoreLiveActivation extends LiveActivation
+{
+   //this is how we act when we initially start as live
+   private SharedStoreMasterPolicy sharedStoreMasterPolicy;
+
+   private HornetQServerImpl hornetQServer;
+
+   public SharedStoreLiveActivation(HornetQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy)
+   {
+      this.hornetQServer = server;
+      this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
+   }
+
+   public void run()
+   {
+      try
+      {
+         HornetQServerLogger.LOGGER.awaitingLiveLock();
+
+         hornetQServer.checkJournalDirectory();
+
+         if (HornetQServerLogger.LOGGER.isDebugEnabled())
+         {
+            HornetQServerLogger.LOGGER.debug("First part initialization on " + this);
+         }
+
+         if (!hornetQServer.initialisePart1(false))
+            return;
+
+         if (hornetQServer.getNodeManager().isBackupLive())
+         {
+            /*
+             * looks like we've failed over at some point need to inform that we are the backup
+             * so when the current live goes down they failover to us
+             */
+            if (HornetQServerLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQServerLogger.LOGGER.debug("announcing backup to the former live" + this);
+            }
+            hornetQServer.getBackupManager().start();
+            hornetQServer.getBackupManager().announceBackup();
+            Thread.sleep(sharedStoreMasterPolicy.getFailbackDelay());
+         }
+
+         hornetQServer.getNodeManager().startLiveNode();
+
+         if (hornetQServer.getState() == HornetQServerImpl.SERVER_STATE.STOPPED || hornetQServer.getState() == HornetQServerImpl.SERVER_STATE.STOPPING)
+         {
+            return;
+         }
+
+         hornetQServer.initialisePart2(false);
+
+         HornetQServerLogger.LOGGER.serverIsLive();
+      }
+      catch (Exception e)
+      {
+         HornetQServerLogger.LOGGER.initializationError(e);
+      }
+   }
+
+   public void close(boolean permanently, boolean restarting) throws Exception
+   {
+      // TO avoid a NPE from stop
+      NodeManager nodeManagerInUse = hornetQServer.getNodeManager();
+
+      if (nodeManagerInUse != null)
+      {
+         if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently)
+         {
+            nodeManagerInUse.crashLiveServer();
+         }
+         else
+         {
+            nodeManagerInUse.pauseLiveServer();
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
index a460d59..59f51f9 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
@@ -392,7 +392,7 @@ public class ManagementServiceImpl implements ManagementService
    public ServerMessage handleMessage(final ServerMessage message) throws Exception
    {
       // a reply message is sent with the result stored in the message body.
-      ServerMessage reply = new ServerMessageImpl(storageManager.generateUniqueID(), 512);
+      ServerMessage reply = new ServerMessageImpl(storageManager.generateID(), 512);
 
       String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
       if (HornetQServerLogger.LOGGER.isDebugEnabled())
@@ -700,7 +700,7 @@ public class ManagementServiceImpl implements ManagementService
                   return;
                }
 
-               long messageID = storageManager.generateUniqueID();
+               long messageID = storageManager.generateID();
 
                ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/settings/impl/AddressSettings.java b/hornetq-server/src/main/java/org/hornetq/core/settings/impl/AddressSettings.java
index 51412cc..88dd300 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/settings/impl/AddressSettings.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/settings/impl/AddressSettings.java
@@ -58,6 +58,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    public static final boolean DEFAULT_SEND_TO_DLA_ON_NO_ROUTE = false;
 
+   public static final long DEFAULT_SLOW_CONSUMER_THRESHOLD = -1;
+
+   public static final long DEFAULT_SLOW_CONSUMER_CHECK_PERIOD = 5;
+
+   public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY;
+
    private AddressFullMessagePolicy addressFullMessagePolicy = null;
 
    private Long maxSizeBytes = null;
@@ -90,6 +96,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Boolean sendToDLAOnNoRoute = null;
 
+   private Long slowConsumerThreshold = null;
+
+   private Long slowConsumerCheckPeriod = null;
+
+   private SlowConsumerPolicy slowConsumerPolicy = null;
+
    public AddressSettings(AddressSettings other)
    {
       this.addressFullMessagePolicy = other.addressFullMessagePolicy;
@@ -108,6 +120,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.lastValueQueue = other.lastValueQueue;
       this.redistributionDelay = other.redistributionDelay;
       this.sendToDLAOnNoRoute = other.sendToDLAOnNoRoute;
+      this.slowConsumerThreshold = other.slowConsumerThreshold;
+      this.slowConsumerCheckPeriod = other.slowConsumerCheckPeriod;
+      this.slowConsumerPolicy = other.slowConsumerPolicy;
    }
 
    public AddressSettings()
@@ -269,6 +284,37 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.redistributionDelay = redistributionDelay;
    }
 
+   public long getSlowConsumerThreshold()
+   {
+      return slowConsumerThreshold != null ? slowConsumerThreshold : AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD;
+   }
+
+   public void setSlowConsumerThreshold(final long slowConsumerThreshold)
+   {
+      this.slowConsumerThreshold = slowConsumerThreshold;
+   }
+
+   public long getSlowConsumerCheckPeriod()
+   {
+      return slowConsumerCheckPeriod != null ? slowConsumerCheckPeriod : AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD;
+   }
+
+   public void setSlowConsumerCheckPeriod(final long slowConsumerCheckPeriod)
+   {
+      this.slowConsumerCheckPeriod = slowConsumerCheckPeriod;
+   }
+
+   public SlowConsumerPolicy getSlowConsumerPolicy()
+   {
+      return slowConsumerPolicy != null ? slowConsumerPolicy
+         : AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY;
+   }
+
+   public void setSlowConsumerPolicy(final SlowConsumerPolicy slowConsumerPolicy)
+   {
+      this.slowConsumerPolicy = slowConsumerPolicy;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -336,6 +382,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       {
          addressFullMessagePolicy = merged.addressFullMessagePolicy;
       }
+      if (slowConsumerThreshold == null)
+      {
+         slowConsumerThreshold = merged.slowConsumerThreshold;
+      }
+      if (slowConsumerCheckPeriod == null)
+      {
+         slowConsumerCheckPeriod = merged.slowConsumerCheckPeriod;
+      }
+      if (slowConsumerPolicy == null)
+      {
+         slowConsumerPolicy = merged.slowConsumerPolicy;
+      }
    }
 
    @Override
@@ -381,14 +439,29 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       redistributionDelay = BufferHelper.readNullableLong(buffer);
 
       sendToDLAOnNoRoute = BufferHelper.readNullableBoolean(buffer);
+
+      slowConsumerThreshold = BufferHelper.readNullableLong(buffer);
+
+      slowConsumerCheckPeriod = BufferHelper.readNullableLong(buffer);
+
+      policyStr = buffer.readNullableSimpleString();
+
+      if (policyStr != null)
+      {
+         slowConsumerPolicy = SlowConsumerPolicy.valueOf(policyStr.toString());
+      }
+      else
+      {
+         slowConsumerPolicy = null;
+      }
    }
 
    @Override
    public int getEncodeSize()
    {
 
-      return BufferHelper.sizeOfNullableSimpleString(addressFullMessagePolicy != null ? addressFullMessagePolicy.toString()
-                                                        : null) + BufferHelper.sizeOfNullableLong(maxSizeBytes) +
+      return BufferHelper.sizeOfNullableSimpleString(addressFullMessagePolicy != null ? addressFullMessagePolicy.toString() : null) +
+         BufferHelper.sizeOfNullableLong(maxSizeBytes) +
          BufferHelper.sizeOfNullableLong(pageSizeBytes) +
          BufferHelper.sizeOfNullableInteger(pageMaxCache) +
          BufferHelper.sizeOfNullableBoolean(dropMessagesWhenFull) +
@@ -402,7 +475,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableLong(expiryDelay) +
          BufferHelper.sizeOfNullableBoolean(lastValueQueue) +
          BufferHelper.sizeOfNullableLong(redistributionDelay) +
-         BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute);
+         BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute) +
+         BufferHelper.sizeOfNullableLong(slowConsumerCheckPeriod) +
+         BufferHelper.sizeOfNullableLong(slowConsumerThreshold) +
+         BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null);
    }
 
    @Override
@@ -440,6 +516,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       BufferHelper.writeNullableLong(buffer, redistributionDelay);
 
       BufferHelper.writeNullableBoolean(buffer, sendToDLAOnNoRoute);
+
+      BufferHelper.writeNullableLong(buffer, slowConsumerThreshold);
+
+      BufferHelper.writeNullableLong(buffer, slowConsumerCheckPeriod);
+
+      buffer.writeNullableSimpleString(slowConsumerPolicy != null ? new SimpleString(slowConsumerPolicy.toString()) : null);
    }
 
    /* (non-Javadoc)
@@ -467,6 +549,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((maxRedeliveryDelay == null) ? 0 : maxRedeliveryDelay.hashCode());
       result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
       result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
+      result = prime * result + ((slowConsumerThreshold == null) ? 0 : slowConsumerThreshold.hashCode());
+      result = prime * result + ((slowConsumerCheckPeriod == null) ? 0 : slowConsumerCheckPeriod.hashCode());
+      result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode());
       return result;
    }
 
@@ -595,6 +680,27 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       }
       else if (!sendToDLAOnNoRoute.equals(other.sendToDLAOnNoRoute))
          return false;
+      if (slowConsumerThreshold == null)
+      {
+         if (other.slowConsumerThreshold != null)
+            return false;
+      }
+      else if (!slowConsumerThreshold.equals(other.slowConsumerThreshold))
+         return false;
+      if (slowConsumerCheckPeriod == null)
+      {
+         if (other.slowConsumerCheckPeriod != null)
+            return false;
+      }
+      else if (!slowConsumerCheckPeriod.equals(other.slowConsumerCheckPeriod))
+         return false;
+      if (slowConsumerPolicy == null)
+      {
+         if (other.slowConsumerPolicy != null)
+            return false;
+      }
+      else if (!slowConsumerPolicy.equals(other.slowConsumerPolicy))
+         return false;
       return true;
    }
 
@@ -635,6 +741,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          redistributionDelay +
          ", sendToDLAOnNoRoute=" +
          sendToDLAOnNoRoute +
+         ", slowConsumerThreshold=" +
+         slowConsumerThreshold +
+         ", slowConsumerCheckPeriod=" +
+         slowConsumerCheckPeriod +
+         ", slowConsumerPolicy=" +
+         slowConsumerPolicy +
          "]";
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/settings/impl/Match.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/settings/impl/Match.java b/hornetq-server/src/main/java/org/hornetq/core/settings/impl/Match.java
index 3537160..f4eb5b0 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/settings/impl/Match.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/settings/impl/Match.java
@@ -27,7 +27,9 @@ public class Match<T>
 
    public static final String WILDCARD = "#";
 
-   private static final String WILDCARD_REPLACEMENT = ".+";
+   public static final String DOT_WILDCARD = ".#";
+
+   private static final String WILDCARD_REPLACEMENT = ".*";
 
    private static final String DOT = ".";
 
@@ -50,9 +52,14 @@ public class Match<T>
       }
       else
       {
+         // this is to match with what's documented
+         actMatch = actMatch.replace(DOT_WILDCARD, WILDCARD);
+
          actMatch = actMatch.replace(Match.DOT, Match.DOT_REPLACEMENT);
-         actMatch = actMatch.replace(Match.WILDCARD, Match.WILDCARD_REPLACEMENT);
          actMatch = actMatch.replace(Match.WORD_WILDCARD, Match.WORD_WILDCARD_REPLACEMENT);
+
+         // this one has to be done by last as we are using .* and it could be replaced wrongly
+         actMatch = actMatch.replace(Match.WILDCARD, Match.WILDCARD_REPLACEMENT);
       }
       pattern = Pattern.compile(actMatch);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/settings/impl/SlowConsumerPolicy.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/settings/impl/SlowConsumerPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/settings/impl/SlowConsumerPolicy.java
new file mode 100644
index 0000000..a16b02d
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/settings/impl/SlowConsumerPolicy.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.settings.impl;
+
+/**
+ * A SlowConsumerPolicy
+ *
+ * @author Justin Bertram
+ *
+ */
+public enum SlowConsumerPolicy
+{
+   KILL, NOTIFY;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/transaction/Transaction.java b/hornetq-server/src/main/java/org/hornetq/core/transaction/Transaction.java
index 6044d13..1010d35 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/transaction/Transaction.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/transaction/Transaction.java
@@ -17,6 +17,8 @@ import java.util.List;
 import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.RefsOperation;
 
 /**
  * A HornetQ internal transaction
@@ -70,4 +72,6 @@ public interface Transaction
    void setContainsPersistent();
 
    void setTimeout(int timeout);
+
+   RefsOperation createRefsOperation(Queue queue);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/transaction/TransactionFactory.java b/hornetq-server/src/main/java/org/hornetq/core/transaction/TransactionFactory.java
new file mode 100644
index 0000000..bd6db6b
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/transaction/TransactionFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.transaction;
+
+import org.hornetq.core.persistence.StorageManager;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by andy on 22/07/14.
+ */
+public interface TransactionFactory
+{
+   Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/BindingsTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/BindingsTransactionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/BindingsTransactionImpl.java
index bc11138..bcb32a2 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/BindingsTransactionImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/BindingsTransactionImpl.java
@@ -13,6 +13,8 @@
 package org.hornetq.core.transaction.impl;
 
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.RefsOperation;
 
 /**
  * A BindingsTransactionImpl
@@ -48,4 +50,9 @@ public class BindingsTransactionImpl extends TransactionImpl
       }
    }
 
+   @Override
+   public RefsOperation createRefsOperation(Queue queue)
+   {
+      return null;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java
index bdef73e..fe050fe 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/transaction/impl/TransactionImpl.java
@@ -26,6 +26,7 @@
 package org.hornetq.core.transaction.impl;
 
 import javax.transaction.xa.Xid;
+
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -34,6 +35,8 @@ import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.RefsOperation;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
 
@@ -76,7 +79,7 @@ public class TransactionImpl implements Transaction
 
       xid = null;
 
-      id = storageManager.generateUniqueID();
+      id = storageManager.generateID();
 
       createTime = System.currentTimeMillis();
 
@@ -89,7 +92,7 @@ public class TransactionImpl implements Transaction
 
       xid = null;
 
-      id = storageManager.generateUniqueID();
+      id = storageManager.generateID();
 
       createTime = System.currentTimeMillis();
    }
@@ -100,7 +103,7 @@ public class TransactionImpl implements Transaction
 
       this.xid = xid;
 
-      id = storageManager.generateUniqueID();
+      id = storageManager.generateID();
 
       createTime = System.currentTimeMillis();
 
@@ -136,6 +139,12 @@ public class TransactionImpl implements Transaction
       this.timeoutSeconds = timeout;
    }
 
+   @Override
+   public RefsOperation createRefsOperation(Queue queue)
+   {
+      return new RefsOperation(queue, storageManager);
+   }
+
    public long getID()
    {
       return id;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/MessageConverter.java b/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/MessageConverter.java
new file mode 100644
index 0000000..b031b31
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/MessageConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.protocol;
+
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * @author Clebert Suconic
+ */
+// TODO: use this interface properly on OpenWire
+public interface MessageConverter
+{
+   ServerMessage inbound(Object messageInbound) throws Exception;
+
+   Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/ProtocolManager.java b/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/ProtocolManager.java
index 002f503..125b17c 100644
--- a/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/ProtocolManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/ProtocolManager.java
@@ -35,5 +35,12 @@ public interface ProtocolManager
 
    boolean isProtocol(byte[] array);
 
+   /**
+    * Gets the Message Converter towards HornetQ.
+    * Notice this being null means no need to convert
+    * @return
+    */
+   MessageConverter getConverter();
+
    void handshake(NettyServerConnection connection, HornetQBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/SessionCallback.java b/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/SessionCallback.java
index 9660ec1..d91af22 100644
--- a/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/SessionCallback.java
+++ b/hornetq-server/src/main/java/org/hornetq/spi/core/protocol/SessionCallback.java
@@ -13,6 +13,7 @@
 package org.hornetq.spi.core.protocol;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.spi.core.remoting.ReadyListener;
 
@@ -25,15 +26,18 @@ import org.hornetq.spi.core.remoting.ReadyListener;
  */
 public interface SessionCallback
 {
+   /** This one gives a chance for Proton to have its own flow control. */
+   boolean hasCredits(ServerConsumer consumerID);
+
    void sendProducerCreditsMessage(int credits, SimpleString address);
 
    void sendProducerCreditsFailMessage(int credits, SimpleString address);
 
-   int sendMessage(ServerMessage message, long consumerID, int deliveryCount);
+   int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount);
 
-   int sendLargeMessage(ServerMessage message, long consumerID, long bodySize, int deliveryCount);
+   int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount);
 
-   int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse);
+   int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, boolean continues, boolean requiresResponse);
 
    void closed();
 
@@ -41,5 +45,5 @@ public interface SessionCallback
 
    void removeReadyListener(ReadyListener listener);
 
-   void disconnect(long consumerId, String queueName);
+   void disconnect(ServerConsumer consumerId, String queueName);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/resources/META-INF/services/org/hornetq/security/basic-security
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/resources/META-INF/services/org/hornetq/security/basic-security b/hornetq-server/src/main/resources/META-INF/services/org/hornetq/security/basic-security
new file mode 100644
index 0000000..4b7875a
--- /dev/null
+++ b/hornetq-server/src/main/resources/META-INF/services/org/hornetq/security/basic-security
@@ -0,0 +1,13 @@
+## ---------------------------------------------------------------------------
+## Copyright 2005-2014 Red Hat, Inc.
+## Red Hat licenses this file to you under the Apache License, version
+## 2.0 (the "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##    http://www.apache.org/licenses/LICENSE-2.0
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+## implied.  See the License for the specific language governing
+## permissions and limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.hornetq.spi.core.security.HornetQSecurityManagerImpl
\ No newline at end of file


Mime
View raw message