activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [06/16] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Sat, 04 Mar 2017 14:46:29 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index b45775c..e27ed30 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -23,13 +23,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -172,7 +171,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     */
    void confirmPendingLargeMessage(long recordID) throws Exception;
 
-   void storeMessage(ServerMessage message) throws Exception;
+   void storeMessage(Message message) throws Exception;
 
    void storeReference(long queueID, long messageID, boolean last) throws Exception;
 
@@ -190,7 +189,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
    void deleteDuplicateID(long recordID) throws Exception;
 
-   void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
+   void storeMessageTransactional(long txID, Message message) throws Exception;
 
    void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception;
 
@@ -225,7 +224,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     * @return a large message object
     * @throws Exception
     */
-   LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception;
+   LargeServerMessage createLargeMessage(long id, Message message) throws Exception;
 
    enum LargeMessageExtension {
       DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync");
@@ -265,11 +264,6 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
    void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
 
-   /**
-    * FIXME Unused
-    */
-   void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception;
-
    void deletePageTransactional(long recordID) throws Exception;
 
    JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
@@ -383,7 +377,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     * needs to be sent to the journal
     * @throws Exception
     */
-   boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception;
+   boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception;
 
    /**
     * Stops the replication of data from the live to the backup.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 2708c72..8311057 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -72,7 +73,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Duplicate
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.GroupingEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
@@ -93,15 +94,14 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.JournalLoader;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
@@ -174,8 +174,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
    private final boolean syncNonTransactional;
 
-   protected int perfBlastPages = -1;
-
    protected boolean journalLoaded = false;
 
    private final IOCriticalErrorListener ioCriticalErrorListener;
@@ -347,7 +345,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
    }
 
    @Override
-   public void storeMessage(final ServerMessage message) throws Exception {
+   public void storeMessage(final Message message) throws Exception {
       if (message.getMessageID() <= 0) {
          // Sanity check only... this shouldn't happen unless there is a bug
          throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned();
@@ -359,9 +357,9 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
          // appropriate
 
          if (message.isLargeMessage()) {
-            messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding((LargeServerMessage) message), false, getContext(false));
+            messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message, false, getContext(false));
          } else {
-            messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message, false, getContext(false));
+            messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false));
          }
       } finally {
          readUnLock();
@@ -460,7 +458,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
    // Transactional operations
 
    @Override
-   public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception {
+   public void storeMessageTransactional(final long txID, final Message message) throws Exception {
       if (message.getMessageID() <= 0) {
          throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned();
       }
@@ -468,9 +466,9 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
       readLock();
       try {
          if (message.isLargeMessage()) {
-            messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding(((LargeServerMessage) message)));
+            messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message);
          } else {
-            messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message);
+            messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message);
          }
 
       } finally {
@@ -502,16 +500,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
    }
 
    @Override
-   public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception {
-      readLock();
-      try {
-         messageJournal.appendUpdateRecord(pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages), syncNonTransactional, getContext(syncNonTransactional));
-      } finally {
-         readUnLock();
-      }
-   }
-
-   @Override
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception {
       readLock();
       try {
@@ -833,7 +821,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
 
-      Map<Long, ServerMessage> messages = new HashMap<>();
+      Map<Long, Message> messages = new HashMap<>();
       readLock();
       try {
 
@@ -884,9 +872,12 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
                   break;
                }
                case JournalRecordIds.ADD_MESSAGE: {
-                  ServerMessage message = new ServerMessageImpl(record.id, 50);
+                  throw new IllegalStateException("This is using old journal data, export your data and import at the correct version");
+               }
 
-                  message.decode(buff);
+               case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
+
+                  Message message = MessagePersister.getInstance().decode(buff, null);
 
                   messages.put(record.id, message);
 
@@ -907,7 +898,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
                      queueMap.put(encoding.queueID, queueMessages);
                   }
 
-                  ServerMessage message = messages.get(messageID);
+                  Message message = messages.get(messageID);
 
                   if (message == null) {
                      ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id);
@@ -1151,10 +1142,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
             pagingManager.processReload();
          }
 
-         if (perfBlastPages != -1) {
-            messageJournal.perfBlast(perfBlastPages);
-         }
-
          journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap);
          journalLoaded = true;
          return info;
@@ -1581,7 +1568,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
       }
    }
 
-   protected abstract LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages,
+   protected abstract LargeServerMessage parseLargeMessage(Map<Long, Message> messages,
                                                            ActiveMQBuffer buff) throws Exception;
 
    private void loadPreparedTransactions(final PostOffice postOffice,
@@ -1603,7 +1590,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
          List<MessageReference> referencesToAck = new ArrayList<>();
 
-         Map<Long, ServerMessage> messages = new HashMap<>();
+         Map<Long, Message> messages = new HashMap<>();
 
          // Use same method as load message journal to prune out acks, so they don't get added.
          // Then have reacknowledge(tx) methods on queue, which needs to add the page size
@@ -1623,9 +1610,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
                   break;
                }
                case JournalRecordIds.ADD_MESSAGE: {
-                  ServerMessage message = new ServerMessageImpl(record.id, 50);
 
-                  message.decode(buff);
+                  break;
+               }
+               case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
+                  Message message = MessagePersister.getInstance().decode(buff, null);
 
                   messages.put(record.id, message);
 
@@ -1638,7 +1627,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
                   encoding.decode(buff);
 
-                  ServerMessage message = messages.get(messageID);
+                  Message message = messages.get(messageID);
 
                   if (message == null) {
                      throw new IllegalStateException("Cannot find message with id " + messageID);
@@ -1915,7 +1904,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
    @Override
    public boolean addToPage(PagingStore store,
-                            ServerMessage msg,
+                            Message msg,
                             Transaction tx,
                             RouteContextList listCtx) throws Exception {
       /**
@@ -1939,4 +1928,5 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
       }
       txoper.confirmedMessages.add(recordID);
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
index 3ca38e3..acf9c8e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
@@ -16,21 +16,21 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
 
 public final class AddMessageRecord {
 
-   public AddMessageRecord(final ServerMessage message) {
+   public AddMessageRecord(final Message message) {
       this.message = message;
    }
 
-   final ServerMessage message;
+   final Message message;
 
    private long scheduledDeliveryTime;
 
    private int deliveryCount;
 
-   public ServerMessage getMessage() {
+   public Message getMessage() {
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index b9449bc..698978b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -44,7 +44,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAck
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
@@ -53,8 +53,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLa
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.XidCodecSupport;
 
@@ -64,6 +63,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
@@ -445,16 +445,15 @@ public final class DescribeJournal {
 
             LargeServerMessage largeMessage = new LargeServerMessageImpl(storageManager);
 
-            LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
-            messageEncoding.decode(buffer);
+            LargeMessagePersister.getInstance().decode(buffer, largeMessage);
 
             return new MessageDescribe(largeMessage);
          }
          case ADD_MESSAGE: {
-            ServerMessage message = new ServerMessageImpl(rec, 50);
-
-            message.decode(buffer);
+            return "ADD-MESSAGE is not supported any longer, use export/import";
+         }
+         case ADD_MESSAGE_PROTOCOL: {
+            Message message = MessagePersister.getInstance().decode(buffer, null);
 
             return new MessageDescribe(message);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index cd1d526..348ac9b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -85,4 +85,7 @@ public final class JournalRecordIds {
    public static final byte PAGE_CURSOR_PENDING_COUNTER = 43;
 
    public static final byte ADDRESS_BINDING_RECORD = 44;
+
+   public static final byte ADD_MESSAGE_PROTOCOL = 45;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 51fd6cc..c31de52 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -49,12 +49,11 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
-import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
 import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
@@ -63,7 +62,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.jboss.logging.Logger;
@@ -157,8 +155,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
       largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesLocation(), false, criticalErrorListener, 1);
 
-      perfBlastPages = config.getJournalPerfBlastPages();
-
       if (config.getPageMaxConcurrentIO() != 1) {
          pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO());
       } else {
@@ -287,13 +283,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
     * @param buff
     * @return
     * @throws Exception
-    */ protected LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages,
+    */ protected LargeServerMessage parseLargeMessage(final Map<Long, Message> messages,
                                                       final ActiveMQBuffer buff) throws Exception {
       LargeServerMessage largeMessage = createLargeMessage();
 
-      LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
-      messageEncoding.decode(buff);
+      LargeMessagePersister.getInstance().decode(buff, largeMessage);
 
       if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
          // for compatibility: couple with old behaviour, copying the old file to avoid message loss
@@ -451,7 +445,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
    }
 
    @Override
-   public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception {
+   public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception {
       readLock();
       try {
          if (isReplicated()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
index 8953291..33be342 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
@@ -21,21 +21,21 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
 
 public class LargeMessageTXFailureCallback implements TransactionFailureCallback {
 
    private AbstractJournalStorageManager journalStorageManager;
-   private final Map<Long, ServerMessage> messages;
+   private final Map<Long, Message> messages;
 
    public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager,
-                                        final Map<Long, ServerMessage> messages) {
+                                        final Map<Long, Message> messages) {
       super();
       this.journalStorageManager = journalStorageManager;
       this.messages = messages;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 90b1fdd..1b5c24e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -25,17 +25,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.TypedProperties;
 import org.jboss.logging.Logger;
 
-public final class LargeServerMessageImpl extends ServerMessageImpl implements LargeServerMessage {
+public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage {
 
    // Constants -----------------------------------------------------
    private static final Logger logger = Logger.getLogger(LargeServerMessageImpl.class);
@@ -43,30 +41,28 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
    // Attributes ----------------------------------------------------
 
    private final JournalStorageManager storageManager;
-
+   private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
    private long pendingRecordID = -1;
-
    private boolean paged;
-
    // We should only use the NIO implementation on the Journal
    private SequentialFile file;
-
    // set when a copyFrom is called
    // The actual copy is done when finishCopy is called
    private SequentialFile pendingCopy;
-
    private long bodySize = -1;
 
-   private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
+   // We cache this
+   private volatile int memoryEstimate = -1;
 
    public LargeServerMessageImpl(final JournalStorageManager storageManager) {
       this.storageManager = storageManager;
    }
 
+   // Public --------------------------------------------------------
+
    /**
     * Copy constructor
     *
@@ -85,14 +81,18 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
       setMessageID(newID);
    }
 
-   // Public --------------------------------------------------------
+   private static String toDate(long timestamp) {
+      if (timestamp == 0) {
+         return "0";
+      } else {
+         return new java.util.Date(timestamp).toString();
+      }
+
+   }
 
-   /**
-    * @param pendingRecordID
-    */
    @Override
-   public void setPendingRecordID(long pendingRecordID) {
-      this.pendingRecordID = pendingRecordID;
+   public boolean isServerMessage() {
+      return true;
    }
 
    @Override
@@ -100,6 +100,14 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
       return this.pendingRecordID;
    }
 
+   /**
+    * @param pendingRecordID
+    */
+   @Override
+   public void setPendingRecordID(long pendingRecordID) {
+      this.pendingRecordID = pendingRecordID;
+   }
+
    @Override
    public void setPaged() {
       paged = true;
@@ -118,39 +126,19 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
       bodySize += bytes.length;
    }
 
-   public void encodeBody(final ActiveMQBuffer bufferOut, final BodyEncoder context, final int size) {
-      try {
-         // This could maybe be optimized (maybe reading directly into bufferOut)
-         ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
-         int bytesRead = context.encode(bufferRead);
-
-         bufferRead.flip();
-
-         if (bytesRead > 0) {
-            bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
-         }
-
-      } catch (Exception e) {
-         throw new RuntimeException(e.getMessage(), e);
-      }
-   }
-
    @Override
    public synchronized int getEncodeSize() {
       return getHeadersAndPropertiesEncodeSize();
    }
 
-   @Override
    public void encode(final ActiveMQBuffer buffer1) {
-      super.encodeHeadersAndProperties(buffer1);
+      super.encodeHeadersAndProperties(buffer1.byteBuf());
    }
 
-   @Override
    public void decode(final ActiveMQBuffer buffer1) {
       file = null;
 
-      super.decodeHeadersAndProperties(buffer1);
+      super.decodeHeadersAndProperties(buffer1.byteBuf());
    }
 
    @Override
@@ -175,7 +163,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
    }
 
    @Override
-   public BodyEncoder getBodyEncoder() throws ActiveMQException {
+   public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
       validateFile();
       return new DecodingContext();
    }
@@ -220,9 +208,6 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
       storageManager.deleteLargeMessageFile(this);
    }
 
-   // We cache this
-   private volatile int memoryEstimate = -1;
-
    @Override
    public synchronized int getMemoryEstimate() {
       if (memoryEstimate == -1) {
@@ -248,28 +233,29 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
    }
 
    @Override
-   public void setOriginalHeaders(final ServerMessage other,
-                                  final MessageReference originalReference,
-                                  final boolean expiry) {
-      super.setOriginalHeaders(other, originalReference, expiry);
-
-      LargeServerMessageImpl otherLM = (LargeServerMessageImpl) other;
-      this.paged = otherLM.paged;
-      if (this.paged) {
-         this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+   public void referenceOriginalMessage(final Message original, String originalQueue) {
+
+      super.referenceOriginalMessage(original, originalQueue);
+
+      if (original instanceof LargeServerMessageImpl) {
+         LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original;
+         this.paged = otherLM.paged;
+         if (this.paged) {
+            this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+         }
       }
    }
 
    @Override
-   public ServerMessage copy() {
+   public Message copy() {
       SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);
 
-      ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);
+      Message newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);
       return newMessage;
    }
 
    @Override
-   public ServerMessage copy(final long newID) {
+   public Message copy(final long newID) {
       try {
          LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
 
@@ -337,19 +323,6 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
          ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
    }
 
-   private static String toDate(long timestamp) {
-      if (timestamp == 0) {
-         return "0";
-      } else {
-         return new java.util.Date(timestamp).toString();
-      }
-
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
    @Override
    protected void finalize() throws Throwable {
       releaseResources();
@@ -400,7 +373,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
 
    // Inner classes -------------------------------------------------
 
-   class DecodingContext implements BodyEncoder {
+   class DecodingContext implements LargeBodyEncoder {
 
       private SequentialFile cFile;
 
@@ -454,7 +427,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
       }
 
       /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.message.BodyEncoder#getLargeBodySize()
+       * @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize()
        */
       @Override
       public long getLargeBodySize() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java
new file mode 100644
index 0000000..cb578e1
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class LargeServerMessagePersister implements Persister<LargeServerMessage> {
+
+   /**
+    *  for future usage...
+    *  when we have refactored large message properly
+    *  this could be used to differentiate other protocols large message persisters
+    */
+   byte PERSISTER_ID = 11;
+
+   public static LargeServerMessagePersister theInstance = new LargeServerMessagePersister();
+
+   public static LargeServerMessagePersister getInstance() {
+      return theInstance;
+   }
+
+   protected LargeServerMessagePersister() {
+   }
+
+   @Override
+   public int getEncodeSize(LargeServerMessage record) {
+      return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString())
+             + record.getPersistSize();
+   }
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, LargeServerMessage record) {
+      buffer.writeByte(PERSISTER_ID);
+      buffer.writeLong(record.getMessageID());
+      buffer.writeNullableSimpleString(record.getAddressSimpleString());
+      record.persist(buffer);
+   }
+
+
+   @Override
+   public LargeServerMessage decode(ActiveMQBuffer buffer, LargeServerMessage record) {
+      // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
+      buffer.readByte(); // for future usage, not used now
+      long id = buffer.readLong();
+      SimpleString address = buffer.readNullableSimpleString();
+      record.reloadPersistence(buffer);
+      record.setMessageID(id);
+      record.setAddress(address);
+      return record;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java
deleted file mode 100644
index cdb5702..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
-
-public class LargeMessageEncoding implements EncodingSupport {
-
-   public final LargeServerMessage message;
-
-   public LargeMessageEncoding(final LargeServerMessage message) {
-      this.message = message;
-   }
-
-   /* (non-Javadoc)
-    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
-    */
-   @Override
-   public void decode(final ActiveMQBuffer buffer) {
-      message.decodeHeadersAndProperties(buffer);
-   }
-
-   /* (non-Javadoc)
-    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
-    */
-   @Override
-   public void encode(final ActiveMQBuffer buffer) {
-      message.encode(buffer);
-   }
-
-   /* (non-Javadoc)
-    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
-    */
-   @Override
-   public int getEncodeSize() {
-      return message.getEncodeSize();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
new file mode 100644
index 0000000..b715f97
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+
+public class LargeMessagePersister implements Persister<LargeServerMessage> {
+
+   private static final LargeMessagePersister theInstance = new LargeMessagePersister();
+
+
+   public static LargeMessagePersister getInstance() {
+      return theInstance;
+   }
+
+   protected LargeMessagePersister() {
+   }
+
+   /* (non-Javadoc)
+    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
+    */
+   @Override
+   public LargeServerMessage decode(final ActiveMQBuffer buffer, LargeServerMessage message) {
+      ((CoreMessage)message).decodeHeadersAndProperties(buffer.byteBuf());
+      return message;
+   }
+
+   /* (non-Javadoc)
+    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
+    */
+   @Override
+   public void encode(final ActiveMQBuffer buffer, LargeServerMessage message) {
+      ((CoreMessage)message).encodeHeadersAndProperties(buffer.byteBuf());
+   }
+
+   /* (non-Javadoc)
+    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
+    */
+   @Override
+   public int getEncodeSize(LargeServerMessage message) {
+      return message.getEncodeSize();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 5b325b6..edd37b7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -17,12 +17,12 @@
 package org.apache.activemq.artemis.core.persistence.impl.nullpm;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 
-class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeServerMessage {
+class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMessage {
 
    NullStorageLargeServerMessage() {
       super();
@@ -39,7 +39,7 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe
    @Override
    public synchronized void addBytes(final byte[] bytes) {
       if (buffer == null) {
-         buffer = ActiveMQBuffers.dynamicBuffer(bytes.length);
+         buffer = ActiveMQBuffers.dynamicBuffer(bytes.length).byteBuf();
       }
 
       // expand the buffer
@@ -67,6 +67,12 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe
    }
 
    @Override
+   public boolean isServerMessage() {
+      return true;
+   }
+
+
+   @Override
    public synchronized int getEncodeSize() {
       return getHeadersAndPropertiesEncodeSize();
    }
@@ -77,7 +83,7 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe
    }
 
    @Override
-   public ServerMessage copy() {
+   public Message copy() {
       // This is a simple copy, used only to avoid changing original properties
       return new NullStorageLargeServerMessage(this);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 2154879..2c297d9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -214,11 +213,11 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public void storeMessage(final ServerMessage message) throws Exception {
+   public void storeMessage(final Message message) throws Exception {
    }
 
    @Override
-   public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception {
+   public void storeMessageTransactional(final long txID, final Message message) throws Exception {
    }
 
    @Override
@@ -274,7 +273,7 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) {
+   public LargeServerMessage createLargeMessage(final long id, final Message message) {
       NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
 
       largeMessage.copyHeadersAndProperties(message);
@@ -464,10 +463,6 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depage) throws Exception {
-   }
-
-   @Override
    public long storePageCounter(final long txID, final long queueID, final long value) throws Exception {
       return 0;
    }
@@ -543,7 +538,7 @@ public class NullStorageManager implements StorageManager {
 
    @Override
    public boolean addToPage(PagingStore store,
-                            ServerMessage msg,
+                            Message msg,
                             Transaction tx,
                             RouteContextList listCtx) throws Exception {
       /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
index 4c6763d..f1e83d2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.artemis.core.postoffice;
 
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.UnproposalListener;
 
 public interface Binding extends UnproposalListener {
@@ -39,7 +40,7 @@ public interface Binding extends UnproposalListener {
 
    Filter getFilter();
 
-   boolean isHighAcceptPriority(ServerMessage message);
+   boolean isHighAcceptPriority(Message message);
 
    boolean isExclusive();
 
@@ -47,9 +48,9 @@ public interface Binding extends UnproposalListener {
 
    int getDistance();
 
-   void route(ServerMessage message, RoutingContext context) throws Exception;
+   void route(Message message, RoutingContext context) throws Exception;
 
-   void routeWithAck(ServerMessage message, RoutingContext context) throws Exception;
+   void routeWithAck(Message message, RoutingContext context) throws Exception;
 
    void close() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
index b79f1da..1d335ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
@@ -18,9 +18,9 @@ package org.apache.activemq.artemis.core.postoffice;
 
 import java.util.Collection;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.group.UnproposalListener;
 
@@ -34,7 +34,7 @@ public interface Bindings extends UnproposalListener {
 
    void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);
 
-   boolean redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception;
+   boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
 
-   void route(ServerMessage message, RoutingContext context) throws Exception;
+   void route(Message message, RoutingContext context) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 7b8ce18..f682777 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -28,7 +29,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -96,33 +96,33 @@ public interface PostOffice extends ActiveMQComponent {
 
    SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
 
-   RoutingStatus route(ServerMessage message, boolean direct) throws Exception;
+   RoutingStatus route(Message message, boolean direct) throws Exception;
 
-   RoutingStatus route(ServerMessage message,
+   RoutingStatus route(Message message,
                        Transaction tx,
                        boolean direct) throws Exception;
 
-   RoutingStatus route(ServerMessage message,
+   RoutingStatus route(Message message,
                        Transaction tx,
                        boolean direct,
                        boolean rejectDuplicates) throws Exception;
 
-   RoutingStatus route(ServerMessage message,
+   RoutingStatus route(Message message,
                        RoutingContext context,
                        boolean direct) throws Exception;
 
-   RoutingStatus route(ServerMessage message,
+   RoutingStatus route(Message message,
                        RoutingContext context,
                        boolean direct,
                        boolean rejectDuplicates) throws Exception;
 
-   MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
+   MessageReference reroute(Message message, Queue queue, Transaction tx) throws Exception;
 
-   Pair<RoutingContext, ServerMessage> redistribute(ServerMessage message,
+   Pair<RoutingContext, Message> redistribute(Message message,
                                                     final Queue originatingQueue,
                                                     Transaction tx) throws Exception;
 
-   void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception;
+   void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception;
 
    DuplicateIDCache getDuplicateIDCache(SimpleString address);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 6be0311..2de97e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -30,14 +30,12 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@@ -152,7 +150,7 @@ public final class BindingsImpl implements Bindings {
    }
 
    @Override
-   public boolean redistribute(final ServerMessage message,
+   public boolean redistribute(final Message message,
                                final Queue originatingQueue,
                                final RoutingContext context) throws Exception {
       if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
@@ -230,18 +228,18 @@ public final class BindingsImpl implements Bindings {
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+   public void route(final Message message, final RoutingContext context) throws Exception {
       route(message, context, true);
    }
 
-   private void route(final ServerMessage message,
+   private void route(final Message message,
                       final RoutingContext context,
                       final boolean groupRouting) throws Exception {
       /* This is a special treatment for scaled-down messages involving SnF queues.
        * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
        */
-      if (message.containsProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS)) {
-         byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS);
+      if (message.containsProperty(Message.HDR_SCALEDOWN_TO_IDS)) {
+         byte[] ids = (byte[]) message.removeProperty(Message.HDR_SCALEDOWN_TO_IDS);
 
          if (ids != null) {
             ByteBuffer buffer = ByteBuffer.wrap(ids);
@@ -251,7 +249,7 @@ public final class BindingsImpl implements Bindings {
                   if (entry.getValue() instanceof RemoteQueueBinding) {
                      RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
                      if (remoteQueueBinding.getRemoteQueueID() == id) {
-                        message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
+                        message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
                      }
                   }
                }
@@ -272,7 +270,7 @@ public final class BindingsImpl implements Bindings {
 
       if (!routed) {
          // Remove the ids now, in order to avoid double check
-         byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+         byte[] ids = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_IDS);
 
          // Fetch the groupId now, in order to avoid double checking
          SimpleString groupId = message.getSimpleStringProperty(Message.HDR_GROUP_ID);
@@ -319,7 +317,7 @@ public final class BindingsImpl implements Bindings {
     * these two servers. This will eventually send more messages to one server than the other
     * (depending if you are using multi-thread), and not lose messages.
     */
-   private Binding getNextBinding(final ServerMessage message,
+   private Binding getNextBinding(final Message message,
                                   final SimpleString routingName,
                                   final List<Binding> bindings) {
       Integer ipos = routingNamePositions.get(routingName);
@@ -407,7 +405,7 @@ public final class BindingsImpl implements Bindings {
       return theBinding;
    }
 
-   private void routeUsingStrictOrdering(final ServerMessage message,
+   private void routeUsingStrictOrdering(final Message message,
                                          final RoutingContext context,
                                          final GroupingHandler groupingGroupingHandler,
                                          final SimpleString groupId,
@@ -473,7 +471,7 @@ public final class BindingsImpl implements Bindings {
       return null;
    }
 
-   private void routeAndCheckNull(ServerMessage message,
+   private void routeAndCheckNull(Message message,
                                   RoutingContext context,
                                   Response resp,
                                   Binding theBinding,
@@ -552,10 +550,10 @@ public final class BindingsImpl implements Bindings {
       return writer.toString();
    }
 
-   private void routeFromCluster(final ServerMessage message,
+   private void routeFromCluster(final Message message,
                                  final RoutingContext context,
                                  final byte[] ids) throws Exception {
-      byte[] idsToAck = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_ACK_IDS);
+      byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS);
 
       List<Long> idsToAckList = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java
index 04f432d..8f4ab48 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.postoffice.impl;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -23,7 +24,6 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public class DivertBinding implements Binding {
 
@@ -98,12 +98,12 @@ public class DivertBinding implements Binding {
    }
 
    @Override
-   public boolean isHighAcceptPriority(final ServerMessage message) {
+   public boolean isHighAcceptPriority(final Message message) {
       return true;
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+   public void route(final Message message, final RoutingContext context) throws Exception {
       divert.route(message, context);
    }
 
@@ -150,7 +150,7 @@ public class DivertBinding implements Binding {
    }
 
    @Override
-   public void routeWithAck(ServerMessage message, RoutingContext context) {
+   public void routeWithAck(Message message, RoutingContext context) {
       //noop
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index af49c4d..176d614 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.postoffice.impl;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
@@ -24,7 +25,6 @@ import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public class LocalQueueBinding implements QueueBinding {
 
@@ -104,7 +104,7 @@ public class LocalQueueBinding implements QueueBinding {
    }
 
    @Override
-   public boolean isHighAcceptPriority(final ServerMessage message) {
+   public boolean isHighAcceptPriority(final Message message) {
       // It's a high accept priority if the queue has at least one matching consumer
 
       return queue.hasMatchingConsumer(message);
@@ -116,14 +116,14 @@ public class LocalQueueBinding implements QueueBinding {
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+   public void route(final Message message, final RoutingContext context) throws Exception {
       if (isMatchRoutingType(context)) {
          queue.route(message, context);
       }
    }
 
    @Override
-   public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
+   public void routeWithAck(Message message, RoutingContext context) throws Exception {
       if (isMatchRoutingType(context)) {
          queue.routeWithAck(message, context);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9a83da/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2f6ae3d..9737dc0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -45,7 +46,7 @@ import org.apache.activemq.artemis.api.core.management.NotificationType;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -69,12 +70,9 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
@@ -665,20 +663,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public RoutingStatus route(final ServerMessage message,
+   public RoutingStatus route(final Message message,
                               final boolean direct) throws Exception {
       return route(message, (Transaction) null, direct);
    }
 
    @Override
-   public RoutingStatus route(final ServerMessage message,
+   public RoutingStatus route(final Message message,
                               final Transaction tx,
                               final boolean direct) throws Exception {
       return route(message, new RoutingContextImpl(tx), direct);
    }
 
    @Override
-   public RoutingStatus route(final ServerMessage message,
+   public RoutingStatus route(final Message message,
                               final Transaction tx,
                               final boolean direct,
                               final boolean rejectDuplicates) throws Exception {
@@ -686,14 +684,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public RoutingStatus route(final ServerMessage message,
+   public RoutingStatus route(final Message message,
                               final RoutingContext context,
                               final boolean direct) throws Exception {
       return route(message, context, direct, true);
    }
 
    @Override
-   public RoutingStatus route(final ServerMessage message,
+   public RoutingStatus route(final Message message,
                               final RoutingContext context,
                               final boolean direct,
                               boolean rejectDuplicates) throws Exception {
@@ -708,7 +706,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       AtomicBoolean startedTX = new AtomicBoolean(false);
 
-      final SimpleString address = message.getAddress();
+      final SimpleString address = message.getAddressSimpleString();
 
       applyExpiryDelay(message, address);
 
@@ -716,13 +714,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          return RoutingStatus.DUPLICATED_ID;
       }
 
-      if (message.hasInternalProperties()) {
-         // We need to perform some cleanup on internal properties,
-         // but we don't do it every time, otherwise it wouldn't be optimal
-         cleanupInternalPropertiesBeforeRouting(message);
-      }
+      message.cleanupInternalProperties();
 
-      Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddress() : context.getAddress());
+      Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddressSimpleString() : context.getAddress());
 
       // TODO auto-create queues here?
       // first check for the auto-queue creation thing
@@ -768,7 +762,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                result = RoutingStatus.NO_BINDINGS;
                ActiveMQServerLogger.LOGGER.noDLA(address);
             } else {
-               message.setOriginalHeaders(message, null, false);
+               message.referenceOriginalMessage(message, null);
 
                message.setAddress(dlaAddress);
 
@@ -806,7 +800,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    // HORNETQ-1029
-   private void applyExpiryDelay(ServerMessage message, SimpleString address) {
+   private void applyExpiryDelay(Message message, SimpleString address) {
       long expirationOverride = addressSettingsRepository.getMatch(address.toString()).getExpiryDelay();
 
       // A -1 <expiry-delay> means don't do anything
@@ -819,12 +813,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public MessageReference reroute(final ServerMessage message,
+   public MessageReference reroute(final Message message,
                                    final Queue queue,
                                    final Transaction tx) throws Exception {
+
       setPagingStore(message);
 
-      MessageReference reference = message.createReference(queue);
+      MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
       if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
          Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
@@ -852,15 +847,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
     * The redistribution can't process the route right away as we may be dealing with a large message which will need to be processed on a different thread
     */
    @Override
-   public Pair<RoutingContext, ServerMessage> redistribute(final ServerMessage message,
+   public Pair<RoutingContext, Message> redistribute(final Message message,
                                                            final Queue originatingQueue,
                                                            final Transaction tx) throws Exception {
       // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
       // arrived the target node
       // as described on https://issues.jboss.org/browse/JBPAPP-6130
-      ServerMessage copyRedistribute = message.copy(storageManager.generateID());
+      Message copyRedistribute = message.copy(storageManager.generateID());
 
-      Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress());
+      Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
 
       if (bindings != null) {
          RoutingContext context = new RoutingContextImpl(tx);
@@ -937,7 +932,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       synchronized (notificationLock) {
          // First send a reset message
 
-         ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50);
+         Message message = new CoreMessage(storageManager.generateID(), 50);
 
          message.setAddress(queueName);
          message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true);
@@ -987,7 +982,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                }
             }
          }
-         ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateID(), 50);
+         Message completeMessage = new CoreMessage(storageManager.generateID(), 50);
 
          completeMessage.setAddress(queueName);
          completeMessage.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA_COMPLETE, true);
@@ -1006,37 +1001,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
    // Private -----------------------------------------------------------------
 
-   /**
-    * @param message
-    */
-   protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message) {
-      LinkedList<SimpleString> valuesToRemove = null;
-
-      for (SimpleString name : message.getPropertyNames()) {
-         // We use properties to establish routing context on clustering.
-         // However if the client resends the message after receiving, it needs to be removed
-         if ((name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS)) || (name.startsWith(MessageImpl.HDR_ROUTE_TO_ACK_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS))) {
-            if (valuesToRemove == null) {
-               valuesToRemove = new LinkedList<>();
-            }
-            valuesToRemove.add(name);
-         }
-      }
-
-      if (valuesToRemove != null) {
-         for (SimpleString removal : valuesToRemove) {
-            message.removeProperty(removal);
-         }
-      }
-   }
 
-   private void setPagingStore(final ServerMessage message) throws Exception {
-      PagingStore store = pagingManager.getPageStore(message.getAddress());
+   private void setPagingStore(final Message message) throws Exception {
+      PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString());
 
-      message.setPagingStore(store);
+      message.setContext(store);
    }
 
-   private void routeQueueInfo(final ServerMessage message,
+   private void routeQueueInfo(final Message message,
                                final Queue queue,
                                final boolean applyFilters) throws Exception {
       if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) {
@@ -1074,13 +1046,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public void processRoute(final ServerMessage message,
+   public void processRoute(final Message message,
                             final RoutingContext context,
                             final boolean direct) throws Exception {
       final List<MessageReference> refs = new ArrayList<>();
 
       Transaction tx = context.getTransaction();
 
+      Long deliveryTime = message.getScheduledDeliveryTime();
+
       for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
          PagingStore store = pagingManager.getPageStore(entry.getKey());
 
@@ -1095,14 +1069,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          }
 
          for (Queue queue : entry.getValue().getNonDurableQueues()) {
-            MessageReference reference = message.createReference(queue);
-
-            refs.add(reference);
-            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
-               Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+            MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
-               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+            if (deliveryTime != null) {
+               reference.setScheduledDeliveryTime(deliveryTime);
             }
+            refs.add(reference);
 
             message.incrementRefCount();
          }
@@ -1112,22 +1084,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          while (iter.hasNext()) {
             Queue queue = iter.next();
 
-            MessageReference reference = message.createReference(queue);
+            MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
-            if (context.isAlreadyAcked(message.getAddress(), queue)) {
+            if (context.isAlreadyAcked(message.getAddressSimpleString(), queue)) {
                reference.setAlreadyAcked();
                if (tx != null) {
                   queue.acknowledge(tx, reference);
                }
             }
 
-            refs.add(reference);
-
-            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
-               Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
 
-               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+            if (deliveryTime != null) {
+               reference.setScheduledDeliveryTime(deliveryTime);
             }
+            refs.add(reference);
 
             if (message.isDurable()) {
                int durableRefCount = message.incrementDurableRefCount();
@@ -1152,7 +1122,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                   storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
                }
 
-               if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
+               if (message.containsDeliveryAnnotationProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                   if (tx != null) {
                      storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
                   } else {
@@ -1189,7 +1159,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
     * @param message
     * @throws Exception
     */
-   private void confirmLargeMessageSend(Transaction tx, final ServerMessage message) throws Exception {
+   private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception {
       LargeServerMessage largeServerMessage = (LargeServerMessage) message;
       if (largeServerMessage.getPendingRecordID() >= 0) {
          if (tx == null) {
@@ -1245,13 +1215,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
    }
 
-   private boolean checkDuplicateID(final ServerMessage message,
+   private boolean checkDuplicateID(final Message message,
                                     final RoutingContext context,
                                     boolean rejectDuplicates,
                                     AtomicBoolean startedTX) throws Exception {
       // Check the DuplicateCache for the Bridge first
 
-      Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+      Object bridgeDup = message.removeDeliveryAnnoationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
       if (bridgeDup != null) {
          // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
          byte[] bridgeDupBytes = (byte[]) bridgeDup;
@@ -1269,8 +1239,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             message.decrementRefCount();
             return false;
          }
-
-         message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
       } else {
          // if used BridgeDuplicate, it's not going to use the regular duplicate
          // since this will would break redistribution (re-setting the duplicateId)
@@ -1281,7 +1249,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          boolean isDuplicate = false;
 
          if (duplicateIDBytes != null) {
-            cache = getDuplicateIDCache(message.getAddress());
+            cache = getDuplicateIDCache(message.getAddressSimpleString());
 
             isDuplicate = cache.contains(duplicateIDBytes);
 
@@ -1338,8 +1306,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
    }
 
-   private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName) {
-      ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50);
+   private Message createQueueInfoMessage(final NotificationType type, final SimpleString queueName) {
+      Message message = new CoreMessage().initBuffer(50).setMessageID(storageManager.generateID());
 
       message.setAddress(queueName);
 
@@ -1433,7 +1401,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          // Reverse the ref counts, and paging sizes
 
          for (MessageReference ref : refs) {
-            ServerMessage message = ref.getMessage();
+            Message message = ref.getMessage();
 
             if (message.isDurable() && ref.getQueue().isDurable()) {
                message.decrementDurableRefCount();


Mime
View raw message