activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [23/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:53 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
index 1a64e6e..99573f8 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
@@ -22,10 +22,10 @@ import java.security.InvalidParameterException;
 import java.security.MessageDigest;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -45,6 +45,7 @@ import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.HornetQIllegalStateException;
+import org.hornetq.api.core.HornetQInternalErrorException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
@@ -106,6 +107,7 @@ import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.Base64;
+import org.hornetq.utils.ByteUtil;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
@@ -350,7 +352,7 @@ public class JournalStorageManager implements StorageManager
     * To achieve (2), instead of writing directly to instances of {@link JournalImpl}, we write to
     * instances of {@link ReplicatedJournal}.
     * <p/>
-    * At the backup-side replication is handled by {@link ReplicationEndpoint}.
+    * At the backup-side replication is handled by {@link org.hornetq.core.replication.ReplicationEndpoint}.
     *
     * @param replicationManager
     * @param pagingManager
@@ -380,6 +382,10 @@ public class JournalStorageManager implements StorageManager
       JournalFile[] messageFiles = null;
       JournalFile[] bindingsFiles = null;
 
+      // We get a picture of the current sitaution on the large messages
+      // and we send the current messages while more state is coming
+      Map<Long, Pair<String, Long>> pendingLargeMessages = null;
+
       try
       {
          Map<SimpleString, Collection<Integer>> pageFilesToSync;
@@ -408,7 +414,7 @@ public class JournalStorageManager implements StorageManager
                   bindingsFiles =
                      prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID, autoFailBack);
                   pageFilesToSync = getPageInformationForSync(pagingManager);
-                  getLargeMessageInformation();
+                  pendingLargeMessages = recoverPendingLargeMessages();
                }
                finally
                {
@@ -428,9 +434,11 @@ public class JournalStorageManager implements StorageManager
             storageManagerLock.writeLock().unlock();
          }
 
+         // it will send a list of IDs that we are allocating
+         replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
          sendJournalFile(messageFiles, JournalContent.MESSAGES);
          sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
-         sendLargeMessageFiles();
+         sendLargeMessageFiles(pendingLargeMessages);
          sendPagesToBackup(pageFilesToSync, pagingManager);
 
          storageManagerLock.writeLock().lock();
@@ -574,23 +582,18 @@ public class JournalStorageManager implements StorageManager
       return info;
    }
 
-   private void sendLargeMessageFiles() throws Exception
+   private void sendLargeMessageFiles(final Map<Long, Pair<String, Long>> pendingLargeMessages) throws Exception
    {
-      while (true)
+      Iterator<Entry<Long, Pair<String, Long>>> iter = pendingLargeMessages.entrySet().iterator();
+      while (started && iter.hasNext())
       {
-         Map.Entry<Long, Pair<String, Long>> entry = replicator.getNextLargeMessageToSync();
-         if (entry == null)
-         {
-            break;
-         }
+         Map.Entry<Long, Pair<String, Long>> entry = iter.next();
          String fileName = entry.getValue().getA();
          final long id = entry.getKey();
          long size = entry.getValue().getB();
          SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
          if (!seqFile.exists())
             continue;
-         if (!started)
-            return;
          replicator.syncLargeMessageFile(seqFile, size, id);
       }
    }
@@ -610,8 +613,9 @@ public class JournalStorageManager implements StorageManager
     *
     * @throws Exception
     */
-   private void getLargeMessageInformation() throws Exception
+   private Map<Long, Pair<String, Long>> recoverPendingLargeMessages() throws Exception
    {
+
       Map<Long, Pair<String, Long>> largeMessages = new HashMap<Long, Pair<String, Long>>();
       // only send durable messages... // listFiles append a "." to anything...
       List<String> filenames = largeMessagesFactory.listFiles("msg");
@@ -620,12 +624,16 @@ public class JournalStorageManager implements StorageManager
       for (String filename : filenames)
       {
          Long id = getLargeMessageIdFromFilename(filename);
-         idList.add(id);
-         SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename, 1);
-         long size = seqFile.size();
-         largeMessages.put(id, new Pair<String, Long>(filename, size));
+         if (!largeMessagesToDelete.contains(id))
+         {
+            idList.add(id);
+            SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename, 1);
+            long size = seqFile.size();
+            largeMessages.put(id, new Pair<String, Long>(filename, size));
+         }
       }
-      replicator.sendLargeMessageIdListMessage(largeMessages);
+
+      return largeMessages;
    }
 
    /**
@@ -763,12 +771,12 @@ public class JournalStorageManager implements StorageManager
       getContext().executeOnCompletion(run);
    }
 
-   public long generateUniqueID()
+   public long generateID()
    {
       return idGenerator.generateID();
    }
 
-   public long getCurrentUniqueID()
+   public long getCurrentID()
    {
       return idGenerator.getCurrentID();
    }
@@ -838,7 +846,7 @@ public class JournalStorageManager implements StorageManager
       readLock();
       try
       {
-         long recordID = generateUniqueID();
+         long recordID = generateID();
 
          messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING,
                                         new PendingLargeMessageEncoding(messageID),
@@ -1081,7 +1089,7 @@ public class JournalStorageManager implements StorageManager
       readLock();
       try
       {
-         pageTransaction.setRecordID(generateUniqueID());
+         pageTransaction.setRecordID(generateID());
          messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(),
                                                      JournalRecordIds.PAGE_TRANSACTION, pageTransaction);
       }
@@ -1207,7 +1215,7 @@ public class JournalStorageManager implements StorageManager
       readLock();
       try
       {
-         long id = generateUniqueID();
+         long id = generateID();
 
          messageJournal.appendAddRecord(id,
                                         JournalRecordIds.HEURISTIC_COMPLETION,
@@ -2392,6 +2400,21 @@ public class JournalStorageManager implements StorageManager
    // This should be accessed from this package only
    void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws HornetQException
    {
+      if (largeServerMessage.getPendingRecordID() < 0)
+      {
+         try
+         {
+            // The delete file happens asynchronously
+            // And the client won't be waiting for the actual file to be deleted.
+            // We set a temporary record (short lived) on the journal
+            // to avoid a situation where the server is restarted and pending large message stays on forever
+            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID()));
+         }
+         catch (Exception e)
+         {
+            throw new HornetQInternalErrorException(e.getMessage(), e);
+         }
+      }
       final SequentialFile file = largeServerMessage.getFile();
       if (file == null)
       {
@@ -3434,7 +3457,7 @@ public class JournalStorageManager implements StorageManager
          // SimpleString simpleStr = new SimpleString(duplID);
          // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
 
-         return "DuplicateIDEncoding [address=" + address + ", duplID=" + Arrays.toString(duplID) + "]";
+         return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
       }
 
    }
@@ -3443,7 +3466,7 @@ public class JournalStorageManager implements StorageManager
     * This is only used when loading a transaction.
     * <p/>
     * it might be possible to merge the functionality of this class with
-    * {@link PagingStoreImpl.FinishPageMessageOperation}
+    * {@link org.hornetq.core.persistence.impl.journal.JournalStorageManager.FinishPageMessageOperation}
     */
    // TODO: merge this class with the one on the PagingStoreImpl
    private static class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
index 0c494d2..5e6bfbd 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
@@ -18,6 +18,7 @@ package org.hornetq.core.persistence.impl.journal;
 import java.nio.ByteBuffer;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.StorageManager.LargeMessageExtension;
@@ -77,15 +78,17 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage
    }
 
    @Override
-   public void setDurable(boolean durable)
+   public Message setDurable(boolean durable)
    {
       mainLM.setDurable(durable);
+      return mainLM;
    }
 
    @Override
-   public synchronized void setMessageID(long id)
+   public synchronized Message setMessageID(long id)
    {
       mainLM.setMessageID(id);
+      return mainLM;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
index b29ea13..37866e9 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
@@ -232,7 +232,7 @@ public class NullStorageManager implements StorageManager
    @Override
    public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
    {
-      return generateUniqueID();
+      return generateID();
    }
 
    @Override
@@ -264,7 +264,7 @@ public class NullStorageManager implements StorageManager
    }
 
    @Override
-   public long generateUniqueID()
+   public long generateID()
    {
       long id = idSequence.getAndIncrement();
 
@@ -272,7 +272,7 @@ public class NullStorageManager implements StorageManager
    }
 
    @Override
-   public long getCurrentUniqueID()
+   public long getCurrentID()
    {
       return idSequence.get();
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java
index 05bd090..1e15014 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java
@@ -130,6 +130,27 @@ public class QueueInfo implements Serializable
       numberOfConsumers--;
    }
 
+   public boolean matchesAddress(SimpleString address)
+   {
+      boolean containsAddress = false;
+
+      if (address != null)
+      {
+         SimpleString[] split = address.split(',');
+         for (SimpleString addressPart : split)
+         {
+            containsAddress = address.startsWith(addressPart);
+
+            if (containsAddress)
+            {
+               break;
+            }
+         }
+      }
+
+      return containsAddress;
+   }
+
    /* (non-Javadoc)
     * @see java.lang.Object#toString()
     */
@@ -153,6 +174,4 @@ public class QueueInfo implements Serializable
              distance +
              "]";
    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
index c037ff5..f545345 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
@@ -34,6 +34,7 @@ import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.RemoteQueueBinding;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.group.impl.Proposal;
 import org.hornetq.core.server.group.impl.Response;
@@ -262,6 +263,34 @@ public final class BindingsImpl implements Bindings
 
    private void route(final ServerMessage message, final RoutingContext context, final boolean groupRouting) throws Exception
    {
+      /* This is a special treatment for scaled-down messages involving SnF queues.
+       * See org.hornetq.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
+       */
+      if (message.containsProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS))
+      {
+         byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS);
+
+         if (ids != null)
+         {
+            ByteBuffer buffer = ByteBuffer.wrap(ids);
+            while (buffer.hasRemaining())
+            {
+               long id = buffer.getLong();
+               for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet())
+               {
+                  if (entry.getValue() instanceof RemoteQueueBinding)
+                  {
+                     RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
+                     if (remoteQueueBinding.getRemoteQueueID() == id)
+                     {
+                        message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
+                     }
+                  }
+               }
+            }
+         }
+      }
+
       boolean routed = false;
 
       if (!exclusiveBindings.isEmpty())

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
index ddd8a44..d10294a 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -93,7 +93,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache
             // cache size has been reduced in config - delete the extra records
             if (txID == -1)
             {
-               txID = storageManager.generateUniqueID();
+               txID = storageManager.generateID();
             }
 
             storageManager.deleteDuplicateIDTransactional(txID, id.getB());
@@ -156,7 +156,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache
       {
          if (persist)
          {
-            recordID = storageManager.generateUniqueID();
+            recordID = storageManager.generateID();
             storageManager.storeDuplicateID(address, duplID, recordID);
          }
 
@@ -166,7 +166,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache
       {
          if (persist)
          {
-            recordID = storageManager.generateUniqueID();
+            recordID = storageManager.generateID();
             storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
 
             tx.setContainsPersistent();
@@ -245,12 +245,15 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache
    {
       synchronized (this)
       {
-         long tx = storageManager.generateUniqueID();
-         for (Pair<ByteArrayHolder, Long> id : ids)
+         if (ids.size() > 0)
          {
-            storageManager.deleteDuplicateIDTransactional(tx, id.getB());
+            long tx = storageManager.generateID();
+            for (Pair<ByteArrayHolder, Long> id : ids)
+            {
+               storageManager.deleteDuplicateIDTransactional(tx, id.getB());
+            }
+            storageManager.commit(tx);
          }
-         storageManager.commit(tx);
 
          ids.clear();
          cache.clear();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
index 8b90e9a..5b3e7cb 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
@@ -34,6 +34,7 @@ import org.hornetq.api.core.HornetQNonExistentQueueException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.filter.Filter;
@@ -218,13 +219,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
    public void onNotification(final Notification notification)
    {
+      if (!(notification.getType() instanceof CoreNotificationType)) return;
+
       if (isTrace)
       {
          HornetQServerLogger.LOGGER.trace("Receiving notification : " + notification + " on server " + this.server);
       }
       synchronized (notificationLock)
       {
-         NotificationType type = notification.getType();
+         CoreNotificationType type = (CoreNotificationType) notification.getType();
 
          switch (type)
          {
@@ -446,7 +449,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
    // PostOffice implementation -----------------------------------------------
 
-   // TODO - needs to be synchronized to prevent happening concurrently with activate().
+   // TODO - needs to be synchronized to prevent happening concurrently with activate()
    // (and possible removeBinding and other methods)
    // Otherwise can have situation where createQueue comes in before failover, then failover occurs
    // and post office is activated but queue remains unactivated after failover so delivery never occurs
@@ -483,7 +486,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          HornetQServerLogger.LOGGER.debug("ClusterCommunication::Sending notification for addBinding " + binding + " from server " + server);
       }
 
-      managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
+      managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_ADDED, props));
    }
 
    public synchronized Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception
@@ -539,7 +542,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, binding.getFilter().getFilterString());
          }
 
-         managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
+         managementService.sendNotification(new Notification(null, CoreNotificationType.BINDING_REMOVED, props));
       }
 
       binding.close();
@@ -555,6 +558,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       {
          cache.clear();
       }
+
+      cache = duplicateIDCaches.remove(BRIDGE_CACHE_STR.concat(address));
+
+      if (cache != null)
+      {
+         cache.clear();
+      }
    }
 
    @Override
@@ -799,7 +809,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
       // arrived the target node
       // as described on https://issues.jboss.org/browse/JBPAPP-6130
-      ServerMessage copyRedistribute = message.copy(storageManager.generateUniqueID());
+      ServerMessage copyRedistribute = message.copy(storageManager.generateID());
 
       Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress());
 
@@ -871,7 +881,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       {
          // First send a reset message
 
-         ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+         ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50);
 
          message.setAddress(queueName);
          message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true);
@@ -883,9 +893,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             {
                HornetQServerLogger.LOGGER.trace("QueueInfo on sendQueueInfoToQueue = " + info);
             }
-            if (info.getAddress().startsWith(address))
+            if (info.matchesAddress(address))
             {
-               message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
+               message = createQueueInfoMessage(CoreNotificationType.BINDING_ADDED, queueName);
 
                message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
@@ -900,7 +910,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
                for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
                {
-                  message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+                  message = createQueueInfoMessage(CoreNotificationType.CONSUMER_CREATED, queueName);
 
                   message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                   message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
@@ -914,7 +924,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                {
                   for (SimpleString filterString : info.getFilterStrings())
                   {
-                     message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+                     message = createQueueInfoMessage(CoreNotificationType.CONSUMER_CREATED, queueName);
 
                      message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                      message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
@@ -927,7 +937,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                }
             }
          }
-         ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+         ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateID(), 50);
 
          completeMessage.setAddress(queueName);
          completeMessage.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA_COMPLETE, true);
@@ -960,7 +970,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          // We use properties to establish routing context on clustering.
          // However if the client resends the message after receiving, it needs to be removed
          if ((name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS)) ||
-            name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS))
+               (name.startsWith(MessageImpl.HDR_ROUTE_TO_ACK_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS)))
          {
             if (valuesToRemove == null)
             {
@@ -1360,7 +1370,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
    private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
    {
-      ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+      ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50);
 
       message.setAddress(queueName);
 
@@ -1513,4 +1523,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
       return bindings;
    }
+
+   // For tests only
+   public AddressManager getAddressManager()
+   {
+      return addressManager;
+   }
+
+   public HornetQServer getServer()
+   {
+      return server;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
index 66128db..08ad4f1 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
@@ -63,9 +63,9 @@ public class SimpleAddressManager implements AddressManager
          throw HornetQMessageBundle.BUNDLE.bindingAlreadyExists(binding);
       }
 
-      if (HornetQServerLogger.LOGGER.isDebugEnabled())
+      if (HornetQServerLogger.LOGGER.isTraceEnabled())
       {
-         HornetQServerLogger.LOGGER.debug("Adding binding " + binding + " with address = " + binding.getUniqueName(), new Exception("trace"));
+         HornetQServerLogger.LOGGER.trace("Adding binding " + binding + " with address = " + binding.getUniqueName(), new Exception("trace"));
       }
 
       return addMappingInternal(binding.getAddress(), binding);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java
index dfc0521..d7edc4a 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java
@@ -14,16 +14,19 @@ package org.hornetq.core.protocol;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
@@ -35,11 +38,15 @@ import org.hornetq.core.remoting.impl.netty.ConnectionCreator;
 import org.hornetq.core.remoting.impl.netty.HttpAcceptorHandler;
 import org.hornetq.core.remoting.impl.netty.HttpKeepAliveRunnable;
 import org.hornetq.core.remoting.impl.netty.NettyAcceptor;
+import org.hornetq.core.remoting.impl.netty.NettyConnector;
 import org.hornetq.core.remoting.impl.netty.NettyServerConnection;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.utils.ConfigurationHelper;
 
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
 public class ProtocolHandler
 {
    private Map<String, ProtocolManager> protocolMap;
@@ -104,6 +111,12 @@ public class ProtocolHandler
                ctx.pipeline().remove("http-handler");
                ctx.fireChannelRead(msg);
             }
+            // HORNETQ-1391
+            else if (upgrade != null && upgrade.equalsIgnoreCase(NettyConnector.HORNETQ_REMOTING))
+            {
+               // Send the response and close the connection if necessary.
+               ctx.writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)).addListener(ChannelFutureListener.CLOSE);
+            }
          }
          else
          {
@@ -133,15 +146,25 @@ public class ProtocolHandler
             return;
          }
          String protocolToUse = null;
-         for (String protocol : protocolMap.keySet())
+         Set<String> protocolSet = protocolMap.keySet();
+         if (!protocolSet.isEmpty())
          {
-            ProtocolManager protocolManager = protocolMap.get(protocol);
-            if (protocolManager.isProtocol(in.copy(0, 8).array()))
+            // Use getBytes(...) as this works with direct and heap buffers.
+            // See https://issues.jboss.org/browse/HORNETQ-1406
+            byte[] bytes = new byte[8];
+            in.getBytes(0, bytes);
+
+            for (String protocol : protocolSet)
             {
-               protocolToUse = protocol;
-               break;
+               ProtocolManager protocolManager = protocolMap.get(protocol);
+               if (protocolManager.isProtocol(bytes))
+               {
+                  protocolToUse = protocol;
+                  break;
+               }
             }
          }
+
          //if we get here we assume we use the core protocol as we match nothing else
          if (protocolToUse == null)
          {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
index 9d5fdb0..d0fe3ac 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
@@ -20,9 +20,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 
 import io.netty.channel.ChannelPipeline;
-import org.hornetq.api.core.HornetQAlreadyReplicatingException;
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.TransportConfiguration;
@@ -37,8 +35,6 @@ import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
-import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
@@ -50,8 +46,8 @@ import org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2;
 import org.hornetq.core.remoting.impl.netty.NettyServerConnection;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
-import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.MessageConverter;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Acceptor;
@@ -81,6 +77,16 @@ class CoreProtocolManager implements ProtocolManager
       this.outgoingInterceptors = outgoingInterceptors;
    }
 
+   /**
+    * no need to implement this now
+    * @return
+    */
+   @Override
+   public MessageConverter getConverter()
+   {
+      return null;
+   }
+
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
    {
       final Configuration config = server.getConfiguration();
@@ -113,7 +119,7 @@ class CoreProtocolManager implements ProtocolManager
 
       channel0.setHandler(new LocalChannelHandler(config, entry, channel0, acceptorUsed, rc));
 
-      server.getClusterManager().addClusterChannelHandler(rc.getChannel(CHANNEL_ID.CLUSTER.id, -1), acceptorUsed, rc);
+      server.getClusterManager().addClusterChannelHandler(rc.getChannel(CHANNEL_ID.CLUSTER.id, -1), acceptorUsed, rc, server.getActivation());
 
       return entry;
    }
@@ -332,33 +338,6 @@ class CoreProtocolManager implements ProtocolManager
                      }
                   }
                });
-
-            }
-         }
-         else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
-         {
-            BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
-            ClusterConnection clusterConnection = acceptorUsed.getClusterConnection();
-
-            if (!config.isSecurityEnabled() || clusterConnection.verify(msg.getClusterUser(), msg.getClusterPassword()))
-            {
-               try
-               {
-                  server.startReplication(rc, clusterConnection, getPair(msg.getConnector(), true),
-                                          msg.isFailBackRequest());
-               }
-               catch (HornetQAlreadyReplicatingException are)
-               {
-                  channel0.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
-               }
-               catch (HornetQException e)
-               {
-                  channel0.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
-               }
-            }
-            else
-            {
-               channel0.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.AUTHENTICATION));
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
index 12dd132..b4d993b 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
@@ -22,6 +22,7 @@ import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuation
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.SessionCallback;
@@ -47,9 +48,9 @@ public final class CoreSessionCallback implements SessionCallback
       this.channel = channel;
    }
 
-   public int sendLargeMessage(ServerMessage message, long consumerID, long bodySize, int deliveryCount)
+   public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount)
    {
-      Packet packet = new SessionReceiveLargeMessage(consumerID, message, bodySize, deliveryCount);
+      Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount);
 
       channel.send(packet);
 
@@ -58,18 +59,18 @@ public final class CoreSessionCallback implements SessionCallback
       return size;
    }
 
-   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse)
    {
-      Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
+      Packet packet = new SessionReceiveContinuationMessage(consumer.getID(), body, continues, requiresResponse);
 
       channel.send(packet);
 
       return packet.getPacketSize();
    }
 
-   public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+   public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount)
    {
-      Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+      Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount);
 
       int size = 0;
 
@@ -112,15 +113,24 @@ public final class CoreSessionCallback implements SessionCallback
    }
 
    @Override
-   public void disconnect(long consumerId, String queueName)
+   public void disconnect(ServerConsumer consumerId, String queueName)
    {
       if (channel.supports(PacketImpl.DISCONNECT_CONSUMER))
       {
-         channel.send(new DisconnectConsumerMessage(consumerId));
+         channel.send(new DisconnectConsumerMessage(consumerId.getID()));
       }
       else
       {
          HornetQServerLogger.LOGGER.warnDisconnectOldClient(queueName);
       }
    }
+
+
+   @Override
+   public boolean hasCredits(ServerConsumer consumer)
+   {
+      // This one will always return has credits
+      // as the flow control is done by hornetq
+      return true;
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
index bb4e111..2d8f227 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
@@ -16,7 +16,6 @@ import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.HornetQExceptionType;
 import org.hornetq.api.core.HornetQInternalErrorException;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.config.BackupStrategy;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.ChannelHandler;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
@@ -119,8 +118,7 @@ public class HornetQPacketHandler implements ChannelHandler
    {
       String nodeID = failoverMessage.getNodeID();
       boolean okToFailover = nodeID == null ||
-            !(server.getConfiguration().getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN &&
-            !server.hasScaledDown(new SimpleString(nodeID)));
+            !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
       channel1.send(new CheckFailoverReplyMessage(okToFailover));
    }
 
@@ -133,7 +131,6 @@ public class HornetQPacketHandler implements ChannelHandler
          Version version = server.getVersion();
          if (!version.isCompatible(request.getVersion()))
          {
-            HornetQServerLogger.LOGGER.incompatibleVersion(request.getVersion(), connection.getRemoteAddress(), version.getFullVersion());
             throw HornetQMessageBundle.BUNDLE.incompatibleClientServer();
          }
 
@@ -180,7 +177,7 @@ public class HornetQPacketHandler implements ChannelHandler
                                                       request.getDefaultAddress(),
                                                       new CoreSessionCallback(request.getName(),
                                                                               protocolManager,
-                                                                              channel));
+                                                                              channel), null);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
                                                                              server.getStorageManager(),
@@ -194,13 +191,17 @@ public class HornetQPacketHandler implements ChannelHandler
       }
       catch (HornetQException e)
       {
-         HornetQServerLogger.LOGGER.failedToCreateSession(e);
-         response = new HornetQExceptionMessage(e);
-
          if (e.getType() == HornetQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
          {
             incompatibleVersion = true;
+            HornetQServerLogger.LOGGER.debug("Sending HornetQException after Incompatible client", e);
+         }
+         else
+         {
+            HornetQServerLogger.LOGGER.failedToCreateSession(e);
          }
+
+         response = new HornetQExceptionMessage(e);
       }
       catch (Exception e)
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java
index d908e54..566bb74 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java
@@ -16,13 +16,11 @@ package org.hornetq.core.protocol.core.impl.wireformat;
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.server.cluster.ha.HAPolicy;
 
 public class BackupRequestMessage extends PacketImpl
 {
    private int backupSize;
    private SimpleString nodeID;
-   private HAPolicy.POLICY_TYPE backupType;
    private String journalDirectory;
    private String bindingsDirectory;
    private String largeMessagesDirectory;
@@ -38,7 +36,6 @@ public class BackupRequestMessage extends PacketImpl
    {
       super(BACKUP_REQUEST);
       this.backupSize = backupSize;
-      backupType = HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE;
       this.journalDirectory = journalDirectory;
       this.bindingsDirectory = bindingsDirectory;
       this.largeMessagesDirectory = largeMessagesDirectory;
@@ -50,7 +47,6 @@ public class BackupRequestMessage extends PacketImpl
       super(BACKUP_REQUEST);
       this.backupSize = backupSize;
       this.nodeID = nodeID;
-      backupType = HAPolicy.POLICY_TYPE.COLOCATED_REPLICATED;
    }
 
    @Override
@@ -58,18 +54,11 @@ public class BackupRequestMessage extends PacketImpl
    {
       super.encodeRest(buffer);
       buffer.writeInt(backupSize);
-      buffer.writeByte(backupType.getType());
-      if (backupType == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE)
-      {
-         buffer.writeString(journalDirectory);
-         buffer.writeString(bindingsDirectory);
-         buffer.writeString(largeMessagesDirectory);
-         buffer.writeString(pagingDirectory);
-      }
-      else
-      {
-         buffer.writeSimpleString(nodeID);
-      }
+      buffer.writeNullableString(journalDirectory);
+      buffer.writeNullableString(bindingsDirectory);
+      buffer.writeNullableString(largeMessagesDirectory);
+      buffer.writeNullableString(pagingDirectory);
+      buffer.writeNullableSimpleString(nodeID);
    }
 
    @Override
@@ -77,18 +66,11 @@ public class BackupRequestMessage extends PacketImpl
    {
       super.decodeRest(buffer);
       backupSize = buffer.readInt();
-      backupType = HAPolicy.POLICY_TYPE.toBackupType(buffer.readByte());
-      if (backupType == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE)
-      {
-         journalDirectory = buffer.readString();
-         bindingsDirectory = buffer.readString();
-         largeMessagesDirectory = buffer.readString();
-         pagingDirectory = buffer.readString();
-      }
-      else
-      {
-         nodeID = buffer.readSimpleString();
-      }
+      journalDirectory = buffer.readNullableString();
+      bindingsDirectory = buffer.readNullableString();
+      largeMessagesDirectory = buffer.readNullableString();
+      pagingDirectory = buffer.readNullableString();
+      nodeID = buffer.readNullableSimpleString();
    }
 
    public int getBackupSize()
@@ -101,11 +83,6 @@ public class BackupRequestMessage extends PacketImpl
       return nodeID;
    }
 
-   public HAPolicy.POLICY_TYPE getBackupType()
-   {
-      return backupType;
-   }
-
    public String getJournalDirectory()
    {
       return journalDirectory;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 1fc484f..114de68 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -23,7 +23,7 @@ import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalCo
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
- * Message is used to sync {@link SequentialFile}s to a backup server. The {@link FileType} controls
+ * Message is used to sync {@link org.hornetq.core.journal.SequentialFile}s to a backup server. The {@link FileType} controls
  * which extra information is sent.
  */
 public final class ReplicationSyncFileMessage extends PacketImpl

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
index e794ef6..ae0838e 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
@@ -20,7 +20,7 @@ import java.util.concurrent.Executor;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.core.security.HornetQPrincipal;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQMessageBundle;
@@ -109,7 +109,7 @@ public final class InVMAcceptor implements Acceptor
          props.putSimpleStringProperty(new SimpleString("factory"),
                                        new SimpleString(InVMAcceptorFactory.class.getName()));
          props.putIntProperty(new SimpleString("id"), id);
-         Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+         Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STARTED, props);
          notificationService.sendNotification(notification);
       }
 
@@ -143,7 +143,7 @@ public final class InVMAcceptor implements Acceptor
          props.putSimpleStringProperty(new SimpleString("factory"),
                                        new SimpleString(InVMAcceptorFactory.class.getName()));
          props.putIntProperty(new SimpleString("id"), id);
-         Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+         Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STOPPED, props);
          try
          {
             notificationService.sendNotification(notification);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java
index 26bdf13..0613724 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java
@@ -19,12 +19,14 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.channel.ChannelFutureListener;
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQInterruptedException;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.security.HornetQPrincipal;
 import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -59,6 +61,8 @@ public class InVMConnection implements Connection
 
    private final HornetQPrincipal defaultHornetQPrincipal;
 
+   private RemotingConnection protocolConnection;
+
    public InVMConnection(final int serverID,
                          final BufferHandler handler,
                          final ConnectionLifeCycleListener listener,
@@ -96,6 +100,24 @@ public class InVMConnection implements Connection
       this.defaultHornetQPrincipal = defaultHornetQPrincipal;
    }
 
+
+   public void forceClose()
+   {
+      // no op
+   }
+
+   public RemotingConnection getProtocolConnection()
+   {
+      return this.protocolConnection;
+   }
+
+   public void setProtocolConnection(RemotingConnection connection)
+   {
+      this.protocolConnection = connection;
+   }
+
+
+
    public void close()
    {
       if (closing)
@@ -132,11 +154,16 @@ public class InVMConnection implements Connection
 
    public void write(final HornetQBuffer buffer)
    {
-      write(buffer, false, false);
+      write(buffer, false, false, null);
    }
 
    public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch)
    {
+      write(buffer, flush, batch, null);
+   }
+
+   public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch, final ChannelFutureListener futureListener)
+   {
       final HornetQBuffer copied = buffer.copy(0, buffer.capacity());
 
       copied.setIndex(buffer.readerIndex(), buffer.writerIndex());
@@ -157,6 +184,11 @@ public class InVMConnection implements Connection
                         HornetQServerLogger.LOGGER.trace(InVMConnection.this + "::Sending inVM packet");
                      }
                      handler.bufferReceived(id, copied);
+                     if (futureListener != null)
+                     {
+                         // TODO BEFORE MERGE: (is null a good option here?)
+                        futureListener.operationComplete(null);
+                     }
                   }
                }
                catch (Exception e)
@@ -224,6 +256,12 @@ public class InVMConnection implements Connection
    {
    }
 
+   @Override
+   public boolean isUsingProtocolHandling()
+   {
+      return false;
+   }
+
    public HornetQPrincipal getDefaultHornetQPrincipal()
    {
       return defaultHornetQPrincipal;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java
index 1d24adf..b47cc66 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java
@@ -12,19 +12,21 @@
  */
 package org.hornetq.core.remoting.impl.invm;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.HornetQMessageBundle;
 import org.hornetq.spi.core.remoting.AbstractConnector;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.ClientProtocolManager;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.utils.ConfigurationHelper;
@@ -34,10 +36,20 @@ import org.hornetq.utils.OrderedExecutorFactory;
  * A InVMConnector
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  *
  */
 public class InVMConnector extends AbstractConnector
 {
+   public static final Map<String, Object> DEFAULT_CONFIG;
+
+   static
+   {
+      Map<String, Object> config = new HashMap<String , Object>();
+      config.put(TransportConstants.SERVER_ID_PROP_NAME, TransportConstants.DEFAULT_SERVER_ID);
+      DEFAULT_CONFIG = Collections.unmodifiableMap(config);
+   }
+
    // Used for testing failure only
    public static volatile boolean failOnCreateConnection;
 
@@ -63,6 +75,8 @@ public class InVMConnector extends AbstractConnector
 
    protected final int id;
 
+   private final ClientProtocolManager protocolManager;
+
    private final BufferHandler handler;
 
    private final ConnectionLifeCycleListener listener;
@@ -81,7 +95,8 @@ public class InVMConnector extends AbstractConnector
                         final BufferHandler handler,
                         final ConnectionLifeCycleListener listener,
                         final Executor closeExecutor,
-                        final Executor threadPool)
+                        final Executor threadPool,
+                        ClientProtocolManager protocolManager)
    {
       super(configuration);
       this.listener = listener;
@@ -97,6 +112,8 @@ public class InVMConnector extends AbstractConnector
       InVMRegistry registry = InVMRegistry.instance;
 
       acceptor = registry.getAcceptor(id);
+
+      this.protocolManager = protocolManager;
    }
 
    public Acceptor getAcceptor()
@@ -180,7 +197,7 @@ public class InVMConnector extends AbstractConnector
    {
       // No acceptor on a client connection
       InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor);
-      listener.connectionCreated(null, inVMConnection, HornetQClient.DEFAULT_CORE_PROTOCOL);
+      listener.connectionCreated(null, inVMConnection, protocolManager.getName());
       return inVMConnection;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
index 9788d85..283ea98 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
@@ -19,6 +19,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.ClientProtocolManager;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.spi.core.remoting.Connector;
 import org.hornetq.spi.core.remoting.ConnectorFactory;
@@ -27,7 +28,7 @@ import org.hornetq.spi.core.remoting.ConnectorFactory;
  * A InVMConnectorFactory
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  */
 public class InVMConnectorFactory implements ConnectorFactory
 {
@@ -36,9 +37,10 @@ public class InVMConnectorFactory implements ConnectorFactory
                                     final ConnectionLifeCycleListener listener,
                                     final Executor closeExecutor,
                                     final Executor threadPool,
-                                    final ScheduledExecutorService scheduledThreadPool)
+                                    final ScheduledExecutorService scheduledThreadPool,
+                                    final ClientProtocolManager protocolManager)
    {
-      InVMConnector connector = new InVMConnector(configuration, handler, listener, closeExecutor, threadPool);
+      InVMConnector connector = new InVMConnector(configuration, handler, listener, closeExecutor, threadPool, protocolManager);
 
       return connector;
    }
@@ -53,4 +55,10 @@ public class InVMConnectorFactory implements ConnectorFactory
    {
       return true;
    }
+
+   @Override
+   public Map<String, Object> getDefaults()
+   {
+      return InVMConnector.DEFAULT_CONFIG;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java
index 46332d3..ea67573 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java
@@ -22,12 +22,15 @@ import org.hornetq.api.config.HornetQDefaultConfiguration;
  * A TransportConstants
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  *
  */
 public final class TransportConstants
 {
    public static final String SERVER_ID_PROP_NAME = "server-id";
 
+   public static final int DEFAULT_SERVER_ID = 0;
+
    public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
 
    public static final Set<String> ALLOWABLE_ACCEPTOR_KEYS;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
index 0a8288b..adcf29f 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
@@ -52,7 +52,7 @@ import org.hornetq.api.config.HornetQDefaultConfiguration;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.protocol.ProtocolHandler;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
@@ -443,7 +443,7 @@ public class NettyAcceptor implements Acceptor
                                           new SimpleString(NettyAcceptorFactory.class.getName()));
             props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host));
             props.putIntProperty(new SimpleString("port"), port);
-            Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props);
+            Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STARTED, props);
             notificationService.sendNotification(notification);
          }
 
@@ -457,7 +457,6 @@ public class NettyAcceptor implements Acceptor
                                                                             TimeUnit.MILLISECONDS);
          }
 
-         // TODO: Think about add Version back to netty
          HornetQServerLogger.LOGGER.startedNettyAcceptor(TransportConstants.NETTY_VERSION, host, port);
       }
    }
@@ -560,7 +559,7 @@ public class NettyAcceptor implements Acceptor
                                        new SimpleString(NettyAcceptorFactory.class.getName()));
          props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host));
          props.putIntProperty(new SimpleString("port"), port);
-         Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props);
+         Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STOPPED, props);
          try
          {
             notificationService.sendNotification(notification);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
index 60d1570..f1b2ce9 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -48,6 +49,7 @@ import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.impl.ServiceRegistry;
 import org.hornetq.core.server.impl.ServerSessionImpl;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -67,6 +69,7 @@ import org.hornetq.utils.HornetQThreadFactory;
  * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  */
 public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
 {
@@ -96,6 +99,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
 
    private ExecutorService threadPool;
 
+   private final Executor flushExecutor;
+
    private final ScheduledExecutorService scheduledThreadPool;
 
    private FailureCheckAndFlushThread failureCheckAndFlushThread;
@@ -106,6 +111,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
 
    private HornetQPrincipal defaultInvmSecurityPrincipal;
 
+   private ServiceRegistry serviceRegistry;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -114,43 +121,29 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
                               final Configuration config,
                               final HornetQServer server,
                               final ManagementService managementService,
-                              final ScheduledExecutorService scheduledThreadPool, List<ProtocolManagerFactory> protocolManagerFactories)
+                              final ScheduledExecutorService scheduledThreadPool,
+                              List<ProtocolManagerFactory> protocolManagerFactories,
+                              final Executor flushExecutor,
+                              final ServiceRegistry serviceRegistry)
    {
+      this.serviceRegistry = serviceRegistry;
+
       acceptorsConfig = config.getAcceptorConfigurations();
 
       this.server = server;
 
       this.clusterManager = clusterManager;
 
-      for (String interceptorClass : config.getIncomingInterceptorClassNames())
-      {
-         try
-         {
-            incomingInterceptors.add((Interceptor) safeInitNewInstance(interceptorClass));
-         }
-         catch (Exception e)
-         {
-            HornetQServerLogger.LOGGER.errorCreatingRemotingInterceptor(e, interceptorClass);
-         }
-      }
+      setInterceptors(config);
 
-      for (String interceptorClass : config.getOutgoingInterceptorClassNames())
-      {
-         try
-         {
-            outgoingInterceptors.add((Interceptor) safeInitNewInstance(interceptorClass));
-         }
-         catch (Exception e)
-         {
-            HornetQServerLogger.LOGGER.errorCreatingRemotingInterceptor(e, interceptorClass);
-         }
-      }
       this.managementService = managementService;
 
       this.scheduledThreadPool = scheduledThreadPool;
 
       CoreProtocolManagerFactory coreProtocolManagerFactory = new CoreProtocolManagerFactory();
       //i know there is only 1
+      this.flushExecutor = flushExecutor;
+
       HornetQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]);
       this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0],
                            coreProtocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
@@ -188,6 +181,23 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
 
    // RemotingService implementation -------------------------------
 
+   private void setInterceptors(Configuration configuration)
+   {
+      addReflectivelyInstantiatedInterceptors(configuration.getIncomingInterceptorClassNames(), incomingInterceptors);
+      addReflectivelyInstantiatedInterceptors(configuration.getOutgoingInterceptorClassNames(), outgoingInterceptors);
+      incomingInterceptors.addAll(serviceRegistry.getIncomingInterceptors());
+      outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors());
+   }
+
+   private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<Interceptor> interceptors)
+   {
+      for (String className : classNames)
+      {
+         Interceptor interceptor = ((Interceptor) safeInitNewInstance(className));
+         interceptors.add(interceptor);
+      }
+   }
+
    public synchronized void start() throws Exception
    {
       if (started)
@@ -463,6 +473,23 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
       return started;
    }
 
+
+   private RemotingConnection getConnection(final Object remotingConnectionID)
+   {
+      ConnectionEntry entry = connections.get(remotingConnectionID);
+
+      if (entry != null)
+      {
+         return entry.connection;
+      }
+      else
+      {
+         HornetQServerLogger.LOGGER.errorRemovingConnection();
+
+         return null;
+      }
+   }
+
    public RemotingConnection removeConnection(final Object remotingConnectionID)
    {
       ConnectionEntry entry = connections.remove(remotingConnectionID);
@@ -689,7 +716,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
 
                for (ConnectionEntry entry : connections.values())
                {
-                  RemotingConnection conn = entry.connection;
+                  final RemotingConnection conn = entry.connection;
 
                   boolean flush = true;
 
@@ -712,16 +739,34 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
 
                   if (flush)
                   {
-                     conn.flush();
+                     flushExecutor.execute(new Runnable()
+                     {
+                        public void run()
+                        {
+                           try
+                           {
+                              // this is using a different thread
+                              // as if anything wrong happens on flush
+                              // failure detection could be affected
+                              conn.flush();
+                           }
+                           catch (Throwable e)
+                           {
+                              HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
+                           }
+
+                        }
+                     });
                   }
                }
 
                for (Object id : idsToRemove)
                {
-                  RemotingConnection conn = removeConnection(id);
+                  RemotingConnection conn = getConnection(id);
                   if (conn != null)
                   {
                      conn.fail(HornetQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
+                     removeConnection(id);
                   }
                }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java
index e6c539e..94dcc29 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java
@@ -30,12 +30,12 @@ import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
 
 /**
- * Used by the {@link JournalStorageManager} to replicate journal calls.
+ * Used by the {@link org.hornetq.core.persistence.impl.journal.JournalStorageManager} to replicate journal calls.
  * <p>
  * This class wraps a {@link ReplicationManager} and the local {@link Journal}. Every call will be
  * relayed to both instances.
  * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- * @see JournalStorageManager
+ * @see org.hornetq.core.persistence.impl.journal.JournalStorageManager
  */
 public class ReplicatedJournal implements Journal
 {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java
index 21721e8..52404a8 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java
@@ -15,32 +15,34 @@
  */
 package org.hornetq.core.replication;
 
+import org.hornetq.api.core.Message;
+
 /**
- * {@link LargeServerMessage} methods used by the {@link ReplicationEndpoint}.
+ * {@link org.hornetq.core.server.LargeServerMessage} methods used by the {@link ReplicationEndpoint}.
  * <p/>
- * In practice a subset of the methods necessary to have a {@link LargeServerMessage}
+ * In practice a subset of the methods necessary to have a {@link org.hornetq.core.server.LargeServerMessage}
  *
- * @see LargeServerMessageInSync
+ * @see org.hornetq.core.persistence.impl.journal.LargeServerMessageInSync
  */
 public interface ReplicatedLargeMessage
 {
    /**
-    * @see LargeServerMessage#setDurable(boolean)
+    * @see org.hornetq.core.server.LargeServerMessage#setDurable(boolean)
     */
-   void setDurable(boolean b);
+   Message setDurable(boolean b);
 
    /**
-    * @see LargeServerMessage#setMessageID(long)
+    * @see org.hornetq.core.server.LargeServerMessage#setMessageID(long)
     */
-   void setMessageID(long id);
+   Message setMessageID(long id);
 
    /**
-    * @see LargeServerMessage#releaseResources()
+    * @see org.hornetq.core.server.LargeServerMessage#releaseResources()
     */
    void releaseResources();
 
    /**
-    * @see LargeServerMessage#deleteFile()
+    * @see org.hornetq.core.server.LargeServerMessage#deleteFile()
     */
    void deleteFile() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
index c667fa3..dd68e36 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
@@ -74,6 +74,7 @@ import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.SharedNothingBackupActivation;
 
 /**
  * Handles all the synchronization necessary for replication on the backup side (that is the
@@ -88,6 +89,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone
    private final IOCriticalErrorListener criticalErrorListener;
    private final HornetQServerImpl server;
    private final boolean wantedFailBack;
+   private final SharedNothingBackupActivation activation;
    private final boolean noSync = false;
    private Channel channel;
 
@@ -125,11 +127,12 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone
 
    // Constructors --------------------------------------------------
    public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener,
-                              boolean wantedFailBack)
+                              boolean wantedFailBack, SharedNothingBackupActivation activation)
    {
       this.server = server;
       this.criticalErrorListener = criticalErrorListener;
       this.wantedFailBack = wantedFailBack;
+      this.activation = activation;
    }
 
    // Public --------------------------------------------------------
@@ -260,7 +263,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone
     */
    private void handleLiveStopping(ReplicationLiveIsStoppingMessage packet) throws HornetQException
    {
-      server.remoteFailOver(packet.isFinalMessage());
+      activation.remoteFailOver(packet.isFinalMessage());
    }
 
    public boolean isStarted()
@@ -394,7 +397,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone
 
    public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws HornetQException
    {
-      if (!server.isRemoteBackupUpToDate())
+      if (!activation.isRemoteBackupUpToDate())
       {
          throw HornetQMessageBundle.BUNDLE.journalsNotInSync();
       }
@@ -481,7 +484,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone
 
       journalsHolder = null;
       backupQuorum.liveIDSet(liveID);
-      server.setRemoteBackupUpToDate();
+      activation.setRemoteBackupUpToDate();
       HornetQServerLogger.LOGGER.backupServerSynched(server);
       return;
    }
@@ -555,7 +558,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone
     */
    private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception
    {
-      if (server.isRemoteBackupUpToDate())
+      if (activation.isRemoteBackupUpToDate())
       {
          throw HornetQMessageBundle.BUNDLE.replicationBackupUpToDate();
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java
index 42f3872..cd43ffa 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java
@@ -16,11 +16,8 @@ import java.io.FileInputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -112,8 +109,6 @@ public final class ReplicationManager implements HornetQComponent
    private volatile boolean enabled;
 
    private final Object replicationLock = new Object();
-   private final Object largeMessageSyncGuard = new Object();
-   private final HashMap<Long, Pair<String, Long>> largeMessagesToSync = new HashMap<Long, Pair<String, Long>>();
 
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
 
@@ -506,22 +501,6 @@ public final class ReplicationManager implements HornetQComponent
       }
    }
 
-   /**
-    * @return
-    */
-   public Map.Entry<Long, Pair<String, Long>> getNextLargeMessageToSync()
-   {
-      Iterator<Entry<Long, Pair<String, Long>>> iter = largeMessagesToSync.entrySet().iterator();
-      if (!iter.hasNext())
-      {
-         return null;
-      }
-
-      Entry<Long, Pair<String, Long>> entry = iter.next();
-      iter.remove();
-      return entry;
-   }
-
    public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
    {
       if (enabled)
@@ -563,7 +542,10 @@ public final class ReplicationManager implements HornetQComponent
             final FileChannel channel = fis.getChannel();
             try
             {
-               final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
+               // We can afford having a single buffer here for this entire loop
+               // because sendReplicatePacket will encode the packet as a NettyBuffer
+               // through HornetQBuffer class leaving this buffer free to be reused on the next copy
+               final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024
                while (true)
                {
                   buffer.clear();
@@ -649,8 +631,7 @@ public final class ReplicationManager implements HornetQComponent
    public void sendLargeMessageIdListMessage(Map<Long, Pair<String, Long>> largeMessages)
    {
       ArrayList<Long> idsToSend;
-      largeMessagesToSync.putAll(largeMessages);
-      idsToSend = new ArrayList<Long>(largeMessagesToSync.keySet());
+      idsToSend = new ArrayList<Long>(largeMessages.keySet());
 
       if (enabled)
          sendReplicatePacket(new ReplicationStartSyncMessage(idsToSend));

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java b/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java
index 01a8f17..c7325ca 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java
@@ -26,5 +26,7 @@ public interface SecurityStore
 
    void check(SimpleString address, CheckType checkType, ServerSession session) throws Exception;
 
+   boolean isSecurityEnabled();
+
    void stop();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java b/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java
index a0a4807..d260ba3 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java
@@ -12,15 +12,13 @@
  */
 package org.hornetq.core.security.impl;
 
-import static org.hornetq.api.core.management.NotificationType.SECURITY_AUTHENTICATION_VIOLATION;
-
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.security.CheckType;
 import org.hornetq.core.security.Role;
 import org.hornetq.core.security.SecurityStore;
@@ -98,6 +96,12 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
 
    // SecurityManager implementation --------------------------------
 
+   @Override
+   public boolean isSecurityEnabled()
+   {
+      return securityEnabled;
+   }
+
    public void stop()
    {
       securityRepository.unRegisterListener(this);
@@ -137,7 +141,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
 
                props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user));
 
-               Notification notification = new Notification(null, SECURITY_AUTHENTICATION_VIOLATION, props);
+               Notification notification = new Notification(null, CoreNotificationType.SECURITY_AUTHENTICATION_VIOLATION, props);
 
                notificationService.sendNotification(notification);
             }
@@ -183,7 +187,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
                props.putSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE, new SimpleString(checkType.toString()));
                props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user));
 
-               Notification notification = new Notification(null, NotificationType.SECURITY_PERMISSION_VIOLATION, props);
+               Notification notification = new Notification(null, CoreNotificationType.SECURITY_PERMISSION_VIOLATION, props);
 
                notificationService.sendNotification(notification);
             }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java b/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java
index 1f10891..361bb3a 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java
@@ -21,9 +21,24 @@ package org.hornetq.core.server;
  */
 public interface ActivateCallback
 {
+   /*
+    * this is called before any services are started when the server first initialised
+    */
    void preActivate();
 
+   /*
+    * this is called after most of the services have been started but before any cluster resources or JMS resources have been
+    */
    void activated();
 
+   /*
+    * this is called when the server is stopping, after any network resources and clients are closed but before the rest
+    * of the resources
+    */
    void deActivate();
+
+   /*
+    * this is called when all resources have been started including any JMS resources
+    */
+   void activationComplete();
 }


Mime
View raw message