activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [04/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Sun, 05 Mar 2017 16:50:05 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index aa1ebf3..2b5ecaf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -127,7 +128,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.ServiceComponent;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 619036d..5b0d406 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -25,7 +25,6 @@ import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.jboss.logging.Logger;
 
@@ -83,7 +82,7 @@ public class DivertImpl implements Divert {
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+   public void route(final Message message, final RoutingContext context) throws Exception {
       // We must make a copy of the message, otherwise things like returning credits to the page won't work
       // properly on ack, since the original address will be overwritten
 
@@ -91,7 +90,7 @@ public class DivertImpl implements Divert {
          logger.trace("Diverting message " + message + " into " + this);
       }
 
-      ServerMessage copy = null;
+      Message copy = null;
 
       // Shouldn't copy if it's not routed anywhere else
       if (!forwardAddress.equals(context.getAddress())) {
@@ -99,7 +98,7 @@ public class DivertImpl implements Divert {
          copy = message.copy(id);
 
          // This will set the original MessageId, and the original address
-         copy.setOriginalHeaders(message, null, false);
+         copy.referenceOriginalMessage(message, null);
 
          copy.setAddress(forwardAddress);
 
@@ -130,7 +129,7 @@ public class DivertImpl implements Divert {
    }
 
    @Override
-   public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
+   public void routeWithAck(Message message, RoutingContext context) throws Exception {
       route(message, context);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
index 40cef50..4adb1b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
@@ -20,6 +20,7 @@ import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.journal.Journal;
@@ -29,7 +30,6 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
 import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -43,7 +43,7 @@ public interface JournalLoader {
 
    void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception;
 
-   void handleNoMessageReferences(Map<Long, ServerMessage> messages);
+   void handleNoMessageReferences(Map<Long, Message> messages);
 
    void handleGroupingBindings(List<GroupingInfo> groupingInfos);
 
@@ -53,7 +53,7 @@ public interface JournalLoader {
                  ResourceManager resourceManager,
                  Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
 
-   void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception;
+   void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception;
 
    void handlePreparedAcknowledge(long messageID,
                                   List<MessageReference> referencesToAck,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index eb467ae..d059d2c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -31,7 +31,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -74,7 +73,7 @@ public class LastValueQueue extends QueueImpl {
          return;
       }
 
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
 
       if (prop != null) {
          HolderReference hr = map.get(prop);
@@ -98,7 +97,7 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addHead(final MessageReference ref, boolean scheduling) {
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      SimpleString prop = ref.getMessage().getDeliveryAnnotationPropertyString(Message.HDR_LAST_VALUE_NAME);
 
       if (prop != null) {
          HolderReference hr = map.get(prop);
@@ -148,7 +147,7 @@ public class LastValueQueue extends QueueImpl {
    @Override
    protected void refRemoved(MessageReference ref) {
       synchronized (this) {
-         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
 
          if (prop != null) {
             map.remove(prop);
@@ -223,7 +222,7 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
-      public ServerMessage getMessage() {
+      public Message getMessage() {
          return ref.getMessage();
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 6d9030e..bffb1ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -18,11 +18,10 @@ package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.MemorySize;
 
 /**
  * Implementation of a MessageReference
@@ -35,7 +34,7 @@ public class MessageReferenceImpl implements MessageReference {
 
    private volatile long scheduledDeliveryTime;
 
-   private final ServerMessage message;
+   private final Message message;
 
    private final Queue queue;
 
@@ -47,20 +46,7 @@ public class MessageReferenceImpl implements MessageReference {
 
    // Static --------------------------------------------------------
 
-   private static final int memoryOffset;
-
-   static {
-      // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
-      // Note, it is only an estimate, it's not possible to be entirely sure with Java
-      // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
-      // The value is somewhat higher on 64 bit architectures, probably due to different alignment
-
-      if (MemorySize.is64bitArch()) {
-         memoryOffset = 48;
-      } else {
-         memoryOffset = 32;
-      }
-   }
+   private static final int memoryOffset = 64;
 
    // Constructors --------------------------------------------------
 
@@ -80,7 +66,7 @@ public class MessageReferenceImpl implements MessageReference {
       this.queue = queue;
    }
 
-   protected MessageReferenceImpl(final ServerMessage message, final Queue queue) {
+   public MessageReferenceImpl(final Message message, final Queue queue) {
       this.message = message;
 
       this.queue = queue;
@@ -155,7 +141,7 @@ public class MessageReferenceImpl implements MessageReference {
    }
 
    @Override
-   public ServerMessage getMessage() {
+   public Message getMessage() {
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 005a994..717e2e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -233,8 +232,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
    }
 
    @Override
-   public void handleNoMessageReferences(Map<Long, ServerMessage> messages) {
-      for (ServerMessage msg : messages.values()) {
+   public void handleNoMessageReferences(Map<Long, Message> messages) {
+      for (Message msg : messages.values()) {
          if (msg.getRefCount() == 0) {
             ActiveMQServerLogger.LOGGER.journalUnreferencedMessage(msg.getMessageID());
             try {
@@ -284,7 +283,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
    }
 
    @Override
-   public void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception {
+   public void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception {
       Queue queue = queues.get(queueID);
 
       if (queue == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2943f15..bdbc57b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -49,7 +49,6 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import org.apache.activemq.artemis.core.persistence.QueueStatus;
@@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -440,12 +438,12 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+   public void route(final Message message, final RoutingContext context) throws Exception {
       context.addQueue(address, this);
    }
 
    @Override
-   public void routeWithAck(ServerMessage message, RoutingContext context) {
+   public void routeWithAck(Message message, RoutingContext context) {
       context.addQueueWithAck(address, this);
    }
 
@@ -922,7 +920,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public boolean hasMatchingConsumer(final ServerMessage message) {
+   public boolean hasMatchingConsumer(final Message message) {
       for (ConsumerHolder holder : consumerList) {
          Consumer consumer = holder.consumer;
 
@@ -1055,7 +1053,7 @@ public class QueueImpl implements Queue {
          pageSubscription.ack((PagedReference) ref);
          postAcknowledge(ref);
       } else {
-         ServerMessage message = ref.getMessage();
+         Message message = ref.getMessage();
 
          boolean durableRef = message.isDurable() && durable;
 
@@ -1087,7 +1085,7 @@ public class QueueImpl implements Queue {
 
          getRefsOperation(tx).addAck(ref);
       } else {
-         ServerMessage message = ref.getMessage();
+         Message message = ref.getMessage();
 
          boolean durableRef = message.isDurable() && durable;
 
@@ -1111,7 +1109,7 @@ public class QueueImpl implements Queue {
 
    @Override
    public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
-      ServerMessage message = ref.getMessage();
+      Message message = ref.getMessage();
 
       if (message.isDurable() && durable) {
          tx.setContainsPersistent();
@@ -1216,11 +1214,11 @@ public class QueueImpl implements Queue {
       return expiryAddress;
    }
 
-   private SimpleString extractAddress(ServerMessage message) {
-      if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
-         return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS);
+   private SimpleString extractAddress(Message message) {
+      if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
+         return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString());
       } else {
-         return message.getAddress();
+         return message.getAddressSimpleString();
       }
    }
 
@@ -1244,7 +1242,9 @@ public class QueueImpl implements Queue {
       List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
       if (scheduledMessages != null && scheduledMessages.size() > 0) {
          for (MessageReference ref : scheduledMessages) {
-            ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
+
+            // TODO-now remove this, use something on Reference
+//            ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
             ref.setScheduledDeliveryTime(0);
          }
          this.addHead(scheduledMessages, true);
@@ -2274,7 +2274,7 @@ public class QueueImpl implements Queue {
    public boolean checkRedelivery(final MessageReference reference,
                                   final long timeBase,
                                   final boolean ignoreRedeliveryDelay) throws Exception {
-      ServerMessage message = reference.getMessage();
+      Message message = reference.getMessage();
 
       if (internalQueue) {
          if (logger.isTraceEnabled()) {
@@ -2337,7 +2337,7 @@ public class QueueImpl implements Queue {
                      final boolean expiry,
                      final boolean rejectDuplicate,
                      final long... queueIDs) throws Exception {
-      ServerMessage copyMessage = makeCopy(ref, expiry);
+      Message copyMessage = makeCopy(ref, expiry);
 
       copyMessage.setAddress(toAddress);
 
@@ -2346,7 +2346,7 @@ public class QueueImpl implements Queue {
          for (long id : queueIDs) {
             buffer.putLong(id);
          }
-         copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
       }
 
       postOffice.route(copyMessage, tx, false, rejectDuplicate);
@@ -2358,16 +2358,17 @@ public class QueueImpl implements Queue {
    private void moveBetweenSnFQueues(final SimpleString queueSuffix,
                                      final Transaction tx,
                                      final MessageReference ref) throws Exception {
-      ServerMessage copyMessage = makeCopy(ref, false, false);
+      Message copyMessage = makeCopy(ref, false, false);
 
       byte[] oldRouteToIDs = null;
       String targetNodeID;
       Binding targetBinding;
 
+      // TODO-now: this needs to go away
       // remove the old route
       for (SimpleString propName : copyMessage.getPropertyNames()) {
-         if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
-            oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName);
+         if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
+            oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName.toString());
             final String hashcodeToString = oldRouteToIDs.toString(); // don't use Arrays.toString(..) here
             logger.debug("Removed property from message: " + propName + " = " + hashcodeToString + " (" + ByteBuffer.wrap(oldRouteToIDs).getLong() + ")");
 
@@ -2420,7 +2421,7 @@ public class QueueImpl implements Queue {
    }
 
    private Pair<String, Binding> locateTargetBinding(SimpleString queueSuffix,
-                                                     ServerMessage copyMessage,
+                                                     Message copyMessage,
                                                      long oldQueueID) {
       String targetNodeID = null;
       Binding targetBinding = null;
@@ -2440,7 +2441,7 @@ public class QueueImpl implements Queue {
                // parse the queue name of the remote queue binding to determine the node ID
                String temp = remoteQueueBinding.getQueue().getName().toString();
                targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
-               logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddress() + " on node " + targetNodeID);
+               logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID);
 
                // now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding
                for (Map.Entry<SimpleString, Binding> entry2 : postOffice.getAllBindings().entrySet()) {
@@ -2468,14 +2469,14 @@ public class QueueImpl implements Queue {
       return new Pair<>(targetNodeID, targetBinding);
    }
 
-   private ServerMessage makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
+   private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
       return makeCopy(ref, expiry, true);
    }
 
-   private ServerMessage makeCopy(final MessageReference ref,
+   private Message makeCopy(final MessageReference ref,
                                   final boolean expiry,
                                   final boolean copyOriginalHeaders) throws Exception {
-      ServerMessage message = ref.getMessage();
+      Message message = ref.getMessage();
       /*
        We copy the message and send that to the dla/expiry queue - this is
        because otherwise we may end up with a ref with the same message id in the
@@ -2487,7 +2488,15 @@ public class QueueImpl implements Queue {
 
       long newID = storageManager.generateID();
 
-      ServerMessage copy = message.makeCopyForExpiryOrDLA(newID, ref, expiry, copyOriginalHeaders);
+      Message copy = message.copy(newID);
+
+      if (copyOriginalHeaders) {
+         copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null);
+      }
+
+      if (expiry) {
+         copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis());
+      }
 
       return copy;
    }
@@ -2549,7 +2558,7 @@ public class QueueImpl implements Queue {
          tx = new TransactionImpl(storageManager);
       }
 
-      ServerMessage copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
+      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
 
       copyMessage.setAddress(address);
 
@@ -2719,7 +2728,7 @@ public class QueueImpl implements Queue {
          return;
       }
 
-      ServerMessage message;
+      Message message;
 
       try {
          message = ref.getMessage();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 8e3a94b..0f3da07 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -22,12 +22,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -122,7 +122,7 @@ public class RefsOperation extends TransactionOperationAbstract {
          try {
             Transaction ackedTX = new TransactionImpl(storageManager);
             for (MessageReference ref : ackedRefs) {
-               ServerMessage message = ref.getMessage();
+               Message message = ref.getMessage();
                if (message.isDurable()) {
                   int durableRefCount = message.incrementDurableRefCount();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index a5f96b1..4590c0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -193,7 +191,7 @@ public class ScaleDownHandler {
                      buffer.putLong(queueID);
                   }
 
-                  message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+                  message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
 
                   if (logger.isDebugEnabled()) {
                      if (messageReference.isPaged()) {
@@ -264,11 +262,11 @@ public class ScaleDownHandler {
                byte[] oldRouteToIDs = null;
 
                List<SimpleString> propertiesToRemove = new ArrayList<>();
-               message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+               message.removeProperty(Message.HDR_ROUTE_TO_IDS.toString());
                for (SimpleString propName : message.getPropertyNames()) {
-                  if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
+                  if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
                      if (propName.toString().endsWith(propertyEnd)) {
-                        oldRouteToIDs = message.getBytesProperty(propName);
+                        oldRouteToIDs = message.getBytesProperty(propName.toString());
                      }
                      propertiesToRemove.add(propName);
                   }
@@ -277,16 +275,17 @@ public class ScaleDownHandler {
                // TODO: what if oldRouteToIDs == null ??
 
                for (SimpleString propertyToRemove : propertiesToRemove) {
-                  message.removeProperty(propertyToRemove);
+                  message.removeProperty(propertyToRemove.toString());
                }
 
                if (queueOnTarget) {
-                  message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs);
+                  message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), oldRouteToIDs);
                } else {
-                  message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs);
+                  message.putBytesProperty(Message.HDR_SCALEDOWN_TO_IDS.toString(), oldRouteToIDs);
                }
 
                logger.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId);
+
                producer.send(message.getAddress(), message);
 
                messageCount++;
@@ -322,13 +321,13 @@ public class ScaleDownHandler {
          List<TransactionOperation> allOperations = transaction.getAllOperations();
 
          // Get the information of the Prepared TXs so it could replay the TXs
-         Map<ServerMessage, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
+         Map<Message, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
          for (TransactionOperation operation : allOperations) {
             if (operation instanceof PostOfficeImpl.AddOperation) {
                PostOfficeImpl.AddOperation addOperation = (PostOfficeImpl.AddOperation) operation;
                List<MessageReference> refs = addOperation.getRelatedMessageReferences();
                for (MessageReference ref : refs) {
-                  ServerMessage message = ref.getMessage();
+                  Message message = ref.getMessage();
                   Queue queue = ref.getQueue();
                   long queueID;
                   String queueName = queue.getName().toString();
@@ -336,7 +335,7 @@ public class ScaleDownHandler {
                   if (queueIDs.containsKey(queueName)) {
                      queueID = queueIDs.get(queueName);
                   } else {
-                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress());
+                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
                      queueIDs.put(queueName, queueID);  // store it so we don't have to look it up every time
                   }
                   Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -350,7 +349,7 @@ public class ScaleDownHandler {
                RefsOperation refsOperation = (RefsOperation) operation;
                List<MessageReference> refs = refsOperation.getReferencesToAcknowledge();
                for (MessageReference ref : refs) {
-                  ServerMessage message = ref.getMessage();
+                  Message message = ref.getMessage();
                   Queue queue = ref.getQueue();
                   long queueID;
                   String queueName = queue.getName().toString();
@@ -358,7 +357,7 @@ public class ScaleDownHandler {
                   if (queueIDs.containsKey(queueName)) {
                      queueID = queueIDs.get(queueName);
                   } else {
-                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress());
+                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
                      queueIDs.put(queueName, queueID);  // store it so we don't have to look it up every time
                   }
                   Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -373,23 +372,23 @@ public class ScaleDownHandler {
          }
 
          ClientProducer producer = session.createProducer();
-         for (Map.Entry<ServerMessage, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet()) {
+         for (Map.Entry<Message, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet()) {
             List<Long> ids = entry.getValue().getA();
             ByteBuffer buffer = ByteBuffer.allocate(ids.size() * 8);
             for (Long id : ids) {
                buffer.putLong(id);
             }
-            ServerMessage message = entry.getKey();
-            message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+            Message message = entry.getKey();
+            message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
             ids = entry.getValue().getB();
             if (ids.size() > 0) {
                buffer = ByteBuffer.allocate(ids.size() * 8);
                for (Long id : ids) {
                   buffer.putLong(id);
                }
-               message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_ACK_IDS, buffer.array());
+               message.putBytesProperty(Message.HDR_ROUTE_TO_ACK_IDS.toString(), buffer.array());
             }
-            producer.send(message.getAddress(), message);
+            producer.send(message.getAddressSimpleString().toString(), message);
          }
          session.end(xid, XAResource.TMSUCCESS);
          session.prepare(xid);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index bcc6df1..710a22b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -31,12 +31,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -48,7 +50,6 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -205,7 +206,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       this.creationTime = System.currentTimeMillis();
 
-
       if (browseOnly) {
          browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
       } else {
@@ -341,7 +341,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
             }
             return HandleStatus.BUSY;
          }
-         final ServerMessage message = ref.getMessage();
+         final Message message = ref.getMessage();
 
          if (filter != null && !filter.match(message)) {
             if (logger.isTraceEnabled()) {
@@ -400,7 +400,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    @Override
    public void proceedDeliver(MessageReference reference) throws Exception {
       try {
-         ServerMessage message = reference.getMessage();
+         Message message = reference.getMessage();
 
          if (message.isLargeMessage() && supportLargeMessage) {
             if (largeMessageDeliverer == null) {
@@ -507,17 +507,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * there are no other messages to be delivered.
     */
    @Override
-   public void forceDelivery(final long sequence) {
-      forceDelivery(sequence, new Runnable() {
-         @Override
-         public void run() {
-            ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
+   public void forceDelivery(final long sequence)  {
+      forceDelivery(sequence, () -> {
+         Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
 
-            forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
-            forcedDeliveryMessage.setAddress(messageQueue.getName());
+         forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+         forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+         callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
 
-            callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
-         }
       });
    }
 
@@ -1018,7 +1016,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * @param ref
     * @param message
     */
-   private void deliverStandardMessage(final MessageReference ref, final ServerMessage message) {
+   private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException {
       int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
 
       if (availableCredits != null) {
@@ -1070,7 +1068,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
        */
       private long positionPendingLargeMessage;
 
-      private BodyEncoder context;
+      private LargeBodyEncoder context;
 
       private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception {
          largeMessage = message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
deleted file mode 100644
index 39e77ca..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server.impl;
-
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.MemorySize;
-import org.apache.activemq.artemis.utils.TypedProperties;
-
-public class ServerMessageImpl extends MessageImpl implements ServerMessage {
-
-   private final AtomicInteger durableRefCount = new AtomicInteger();
-
-   private final AtomicInteger refCount = new AtomicInteger();
-
-   private PagingStore pagingStore;
-
-   private static final int memoryOffset;
-
-   private boolean persisted = false;
-
-   static {
-      // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
-      // Note, it is only an estimate, it's not possible to be entirely sure with Java
-      // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
-      // The value is somewhat higher on 64 bit architectures, probably due to different alignment
-
-      if (MemorySize.is64bitArch()) {
-         memoryOffset = 352;
-      } else {
-         memoryOffset = 232;
-      }
-   }
-
-   /*
-    * Constructor for when reading from network
-    */
-   public ServerMessageImpl() {
-   }
-
-   /*
-    * Construct a MessageImpl from storage, or notification, or before routing
-    */
-   public ServerMessageImpl(final long messageID, final int initialMessageBufferSize) {
-      super(initialMessageBufferSize);
-
-      this.messageID = messageID;
-   }
-
-   /*
-    * Copy constructor
-    */
-   protected ServerMessageImpl(final ServerMessageImpl other) {
-      super(other);
-   }
-
-   /*
-    * Copy constructor
-    */
-   protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties) {
-      super(other, properties);
-   }
-
-   @Override
-   public boolean isServerMessage() {
-      return true;
-   }
-
-   @Override
-   public ServerMessageImpl setMessageID(final long id) {
-      messageID = id;
-      return this;
-   }
-
-   @Override
-   public MessageReference createReference(final Queue queue) {
-      MessageReference ref = new MessageReferenceImpl(this, queue);
-
-      return ref;
-   }
-
-   @Override
-   public boolean hasInternalProperties() {
-      return properties.hasInternalProperties();
-   }
-
-   @Override
-   public int incrementRefCount() throws Exception {
-      int count = refCount.incrementAndGet();
-
-      if (pagingStore != null) {
-         if (count == 1) {
-            pagingStore.addSize(getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
-         } else {
-            pagingStore.addSize(MessageReferenceImpl.getMemoryEstimate());
-         }
-      }
-
-      return count;
-   }
-
-   @Override
-   public int decrementRefCount() throws Exception {
-      int count = refCount.decrementAndGet();
-
-      if (count < 0) {
-         // this could happen on paged messages since they are not routed and incrementRefCount is never called
-         return count;
-      }
-
-      if (pagingStore != null) {
-         if (count == 0) {
-            pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
-
-            if (buffer != null) {
-               // release the buffer now
-               buffer.byteBuf().release();
-            }
-         } else {
-            pagingStore.addSize(-MessageReferenceImpl.getMemoryEstimate());
-         }
-      }
-
-      return count;
-   }
-
-   @Override
-   public int incrementDurableRefCount() {
-      return durableRefCount.incrementAndGet();
-   }
-
-   @Override
-   public int decrementDurableRefCount() {
-      return durableRefCount.decrementAndGet();
-   }
-
-   @Override
-   public int getRefCount() {
-      return refCount.get();
-   }
-
-   @Override
-   public boolean isLargeMessage() {
-      return false;
-   }
-
-   private volatile int memoryEstimate = -1;
-
-   @Override
-   public int getMemoryEstimate() {
-      if (memoryEstimate == -1) {
-         memoryEstimate = ServerMessageImpl.memoryOffset + buffer.capacity() + properties.getMemoryOffset();
-      }
-
-      return memoryEstimate;
-   }
-
-   @Override
-   public ServerMessage copy(final long newID) {
-      ServerMessage m = new ServerMessageImpl(this);
-
-      m.setMessageID(newID);
-
-      return m;
-   }
-
-   @Override
-   public ServerMessage copy() {
-      // This is a simple copy, used only to avoid changing original properties
-      return new ServerMessageImpl(this);
-   }
-
-   public ServerMessage makeCopyForExpiryOrDLA(final long newID,
-                                               MessageReference originalReference,
-                                               final boolean expiry) throws Exception {
-      return makeCopyForExpiryOrDLA(newID, originalReference, expiry, true);
-   }
-
-   @Override
-   public ServerMessage makeCopyForExpiryOrDLA(final long newID,
-                                               MessageReference originalReference,
-                                               final boolean expiry,
-                                               final boolean copyOriginalHeaders) throws Exception {
-      /*
-       We copy the message and send that to the dla/expiry queue - this is
-       because otherwise we may end up with a ref with the same message id in the
-       queue more than once which would barf - this might happen if the same message had been
-       expire from multiple subscriptions of a topic for example
-       We set headers that hold the original message address, expiry time
-       and original message id
-      */
-
-      ServerMessage copy = copy(newID);
-
-      if (copyOriginalHeaders) {
-         copy.setOriginalHeaders(this, originalReference, expiry);
-      }
-
-      return copy;
-   }
-
-   @Override
-   public void setOriginalHeaders(final ServerMessage other,
-                                  final MessageReference originalReference,
-                                  final boolean expiry) {
-      SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
-
-      if (originalQueue != null) {
-         putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
-      } else if (originalReference != null) {
-         putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName());
-      }
-
-      if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
-         putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
-
-         putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID));
-      } else {
-         putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getAddress());
-
-         putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID());
-      }
-
-      // reset expiry
-      setExpiration(0);
-
-      if (expiry) {
-         long actualExpiryTime = System.currentTimeMillis();
-
-         putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
-      }
-
-      bufferValid = false;
-   }
-
-   @Override
-   public void setPagingStore(final PagingStore pagingStore) {
-      this.pagingStore = pagingStore;
-
-      // On the server side, we reset the address to point to the instance of address in the paging store
-      // Otherwise each message would have its own copy of the address String which would take up more memory
-      address = pagingStore.getAddress();
-   }
-
-   @Override
-   public synchronized void forceAddress(final SimpleString address) {
-      this.address = address;
-      bufferValid = false;
-   }
-
-   @Override
-   public PagingStore getPagingStore() {
-      return pagingStore;
-   }
-
-   @Override
-   public boolean storeIsPaging() {
-      if (pagingStore != null) {
-         return pagingStore.isPaging();
-      } else {
-         return false;
-      }
-   }
-
-   @Override
-   public String toString() {
-      try {
-         return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() +
-            ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
-            ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
-      } catch (Throwable e) {
-         return "ServerMessage[messageID=" + messageID + "]";
-      }
-   }
-
-   private static String toDate(long timestamp) {
-      if (timestamp == 0) {
-         return "0";
-      } else {
-         return new java.util.Date(timestamp).toString();
-      }
-
-   }
-
-   @Override
-   public InputStream getBodyInputStream() {
-      return null;
-   }
-
-   // Encoding stuff
-
-   @Override
-   public void encodeMessageIDToBuffer() {
-      // We first set the message id - this needs to be set on the buffer since this buffer will be re-used
-
-      buffer.setLong(buffer.getInt(MessageImpl.BUFFER_HEADER_SPACE) + DataConstants.SIZE_INT, messageID);
-   }
-
-   @Override
-   public byte[] getDuplicateIDBytes() {
-      Object duplicateID = getDuplicateProperty();
-
-      if (duplicateID == null) {
-         return null;
-      } else {
-         if (duplicateID instanceof SimpleString) {
-            return ((SimpleString) duplicateID).getData();
-         } else {
-            return (byte[]) duplicateID;
-         }
-      }
-   }
-
-   @Override
-   public Object getDuplicateProperty() {
-      return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 52ecda1..1cd19bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -41,12 +42,10 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
 import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -66,14 +65,13 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.TempQueueObserver;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -155,9 +153,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    private final SimpleString managementAddress;
 
-   // The current currentLargeMessage being processed
-   private volatile LargeServerMessage currentLargeMessage;
-
    protected final RoutingContext routingContext = new RoutingContextImpl(null);
 
    protected final SessionCallback callback;
@@ -187,6 +182,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    private Map<SimpleString, RoutingType> prefixes;
 
+   private Set<Closeable> closeables;
+
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -273,6 +270,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public void addCloseable(Closeable closeable) {
+      if (closeables == null) {
+         closeables = new HashSet<>();
+      }
+      this.closeables.add(closeable);
+   }
+
+   @Override
    public void disableSecurity() {
       this.securityEnabled = false;
    }
@@ -376,11 +381,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       consumers.clear();
 
-      if (currentLargeMessage != null) {
-         try {
-            currentLargeMessage.deleteFile();
-         } catch (Throwable error) {
-            ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+      if (closeables != null) {
+         for (Closeable closeable : closeables) {
+            closeable.close(failed);
          }
       }
 
@@ -1272,30 +1275,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
-   public void sendLarge(final MessageInternal message) throws Exception {
-      // need to create the LargeMessage before continue
-      long id = storageManager.generateID();
-
-      LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
-
-      if (logger.isTraceEnabled()) {
-         logger.trace("sendLarge::" + largeMsg);
-      }
-
-      if (currentLargeMessage != null) {
-         ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
-      }
-
-      currentLargeMessage = largeMsg;
-   }
-
-   @Override
-   public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception {
+   public RoutingStatus send(final Message message, final boolean direct) throws Exception {
       return send(message, direct, false);
    }
 
    @Override
-   public RoutingStatus send(final ServerMessage message,
+   public RoutingStatus send(final Message message,
                              final boolean direct,
                              boolean noAutoCreateQueue) throws Exception {
       return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
@@ -1303,7 +1288,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public RoutingStatus send(Transaction tx,
-                             final ServerMessage message,
+                             final Message message,
                              final boolean direct,
                              boolean noAutoCreateQueue) throws Exception {
 
@@ -1319,19 +1304,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       //case the id header already generated.
       if (!message.isLargeMessage()) {
          long id = storageManager.generateID();
-
+         // This will re-encode the message
          message.setMessageID(id);
-         message.encodeMessageIDToBuffer();
       }
 
       if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) {
          message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
       }
 
-      SimpleString address = removePrefix(message.getAddress());
+      SimpleString originalAddress = message.getAddressSimpleString();
+
+      SimpleString address = removePrefix(message.getAddressSimpleString());
 
       // In case the prefix was removed, we also need to update the message
-      if (address != message.getAddress()) {
+      if (address != message.getAddressSimpleString()) {
          message.setAddress(address);
       }
 
@@ -1340,14 +1326,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
 
       if (address == null) {
-         if (message.isDurable()) {
-            // We need to force a re-encode when the message gets persisted or when it gets reloaded
-            // it will have no address
-            message.setAddress(defaultAddress);
-         } else {
-            // We don't want to force a re-encode when the message gets sent to the consumer
-            message.setAddressTransient(defaultAddress);
-         }
+         // We don't want to force a re-encode when the message gets sent to the consumer
+         message.setAddress(defaultAddress);
       }
 
       if (logger.isTraceEnabled()) {
@@ -1359,42 +1339,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw ActiveMQMessageBundle.BUNDLE.noAddress();
       }
 
-      if (message.getAddress().equals(managementAddress)) {
+      if (message.getAddressSimpleString().equals(managementAddress)) {
          // It's a management message
 
          handleManagementMessage(tx, message, direct);
       } else {
-         result = doSend(tx, message, direct, noAutoCreateQueue);
+         result = doSend(tx, message, originalAddress, direct, noAutoCreateQueue);
       }
       return result;
    }
 
-   @Override
-   public void sendContinuations(final int packetSize,
-                                 final long messageBodySize,
-                                 final byte[] body,
-                                 final boolean continues) throws Exception {
-      if (currentLargeMessage == null) {
-         throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
-      }
-
-      // Immediately release the credits for the continuations- these don't contribute to the in-memory size
-      // of the message
-
-      currentLargeMessage.addBytes(body);
-
-      if (!continues) {
-         currentLargeMessage.releaseResources();
-
-         if (messageBodySize >= 0) {
-            currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
-         }
-
-         doSend(tx, currentLargeMessage, false, false);
-
-         currentLargeMessage = null;
-      }
-   }
 
    @Override
    public void requestProducerCredits(SimpleString address, final int credits) throws Exception {
@@ -1566,10 +1520,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       connectionFailed(me, failedOver);
    }
 
-   public void clearLargeMessage() {
-      currentLargeMessage = null;
-   }
-
    private void installJMSHooks() {
    }
 
@@ -1588,10 +1538,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    private RoutingStatus handleManagementMessage(final Transaction tx,
-                                                 final ServerMessage message,
+                                                 final Message message,
                                                  final boolean direct) throws Exception {
       try {
-         securityCheck(removePrefix(message.getAddress()), CheckType.MANAGE, this);
+         securityCheck(removePrefix(message.getAddressSimpleString()), CheckType.MANAGE, this);
       } catch (ActiveMQException e) {
          if (!autoCommitSends) {
             tx.markAsRollbackOnly(e);
@@ -1599,9 +1549,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw e;
       }
 
-      ServerMessage reply = managementService.handleMessage(message);
+      Message reply = managementService.handleMessage(message);
 
-      SimpleString replyTo = message.getSimpleStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
+      SimpleString replyTo = message.getReplyTo();
 
       if (replyTo != null) {
          // TODO: move this check somewhere else? this is a JMS-specific bit of logic in the core impl
@@ -1612,7 +1562,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          }
          reply.setAddress(replyTo);
 
-         doSend(tx, reply, direct, false);
+         doSend(tx, reply, null, direct, false);
       }
 
       return RoutingStatus.OK;
@@ -1669,21 +1619,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       theTx.rollback();
    }
 
+   @Override
    public RoutingStatus doSend(final Transaction tx,
-                               final ServerMessage msg,
+                               final Message msg,
+                               final SimpleString originalAddress,
                                final boolean direct,
                                final boolean noAutoCreateQueue) throws Exception {
       RoutingStatus result = RoutingStatus.OK;
 
-      /**
-       *  TODO Checking message properties on each message is expensive.  Instead we should update the API and Core Packets
-       *  to add the RoutingType information directly.
-       */
-      RoutingType routingType = null;
-      if (msg.containsProperty(Message.HDR_ROUTING_TYPE)) {
-         routingType = RoutingType.getType(msg.getByteProperty(Message.HDR_ROUTING_TYPE));
+      RoutingType routingType = msg.getRouteType();
+
+      if (originalAddress != null) {
+         if (originalAddress.toString().startsWith("anycast:")) {
+            routingType = RoutingType.ANYCAST;
+         } else if (originalAddress.toString().startsWith("multicast:")) {
+            routingType = RoutingType.MULTICAST;
+         }
       }
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddress(), routingType);
+
+      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
 
       // Consumer
       // check the user has write access to this address.
@@ -1707,12 +1661,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
          result = postOffice.route(msg, routingContext, direct);
 
-         Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
+         Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
 
          if (value == null) {
-            targetAddressInfos.put(msg.getAddress(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
+            // TODO-now: userID
+//            targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>((UUID)msg.getUserID(), new AtomicLong(1)));
          } else {
-            value.setA(msg.getUserID());
+            // TODO-now: userID
+            value.setA((UUID)msg.getUserID());
             value.getB().incrementAndGet();
          }
       } finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
index 0222928..29a2e47 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
@@ -21,6 +21,9 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
@@ -41,8 +44,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -128,5 +129,5 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
 
    Object[] getResources(Class<?> resourceType);
 
-   ServerMessage handleMessage(ServerMessage message) throws Exception;
+   ICoreMessage handleMessage(Message message) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 55f2aea..f45aea7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -33,7 +33,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.management.AcceptorControl;
@@ -56,6 +59,7 @@ import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImp
 import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl;
 import org.apache.activemq.artemis.core.management.impl.DivertControlImpl;
 import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -71,13 +75,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
@@ -365,9 +366,11 @@ public class ManagementServiceImpl implements ManagementService {
    }
 
    @Override
-   public ServerMessage handleMessage(final ServerMessage message) throws Exception {
+   public ICoreMessage handleMessage(Message message) throws Exception {
+      message = message.toCore();
       // a reply message is sent with the result stored in the message body.
-      ServerMessage reply = new ServerMessageImpl(storageManager.generateID(), 512);
+      CoreMessage reply = new CoreMessage(storageManager.generateID(), 512);
+      reply.setReplyTo(message.getReplyTo());
 
       String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
       if (logger.isDebugEnabled()) {
@@ -631,7 +634,7 @@ public class ManagementServiceImpl implements ManagementService {
 
                long messageID = storageManager.generateID();
 
-               ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512);
+               Message notificationMessage = new CoreMessage(messageID, 512);
 
                // Notification messages are always durable so the user can choose whether to add a durable queue to
                // consume them in

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
index efe4cf9..0ee1b7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.utils.JsonLoader;
 
@@ -97,7 +97,7 @@ public abstract class TransactionDetail {
 
             msgJson.add(KEY_MSG_OP_TYPE, opType);
 
-            ServerMessage msg = ref.getMessage().copy();
+            Message msg = ref.getMessage().copy();
 
             msgJson.add(KEY_MSG_TYPE, decodeMessageType(msg));
             JsonUtil.addToObject(KEY_MSG_PROPERTIES, decodeMessageProperties(msg), msgJson);
@@ -108,7 +108,7 @@ public abstract class TransactionDetail {
       return detailJson.build();
    }
 
-   public abstract String decodeMessageType(ServerMessage msg);
+   public abstract String decodeMessageType(Message msg);
 
-   public abstract Map<String, Object> decodeMessageProperties(ServerMessage msg);
+   public abstract Map<String, Object> decodeMessageProperties(Message msg);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
index 4730596..95036da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.transaction.impl;
 import javax.transaction.xa.Xid;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionDetail;
 
@@ -31,8 +31,11 @@ public class CoreTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public String decodeMessageType(ServerMessage msg) {
-      int type = msg.getType();
+   public String decodeMessageType(Message msg) {
+      if (!(msg instanceof ICoreMessage)) {
+         return "N/A";
+      }
+      int type = ((ICoreMessage)msg).getType();
       switch (type) {
          case Message.DEFAULT_TYPE: // 0
             return "Default";
@@ -52,7 +55,7 @@ public class CoreTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public Map<String, Object> decodeMessageProperties(ServerMessage msg) {
+   public Map<String, Object> decodeMessageProperties(Message msg) {
       return msg.toMap();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index a342e13..a440e31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -16,12 +16,12 @@
  */
 package org.apache.activemq.artemis.spi.core.protocol;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
 
-// TODO: use this interface properly on OpenWire
-public interface MessageConverter {
+public interface MessageConverter<ProtocolMessage extends Message> {
 
-   ServerMessage inbound(Object messageInbound) throws Exception;
+   ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception;
 
-   Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception;
+   ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
new file mode 100644
index 0000000..14891f5
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.spi.core.protocol;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.jboss.logging.Logger;
+
+public class MessagePersister implements Persister<Message> {
+
+   private static final Logger logger = Logger.getLogger(MessagePersister.class);
+
+   private static final MessagePersister theInstance = new MessagePersister();
+
+   /** This will be used for reading messages */
+   private static Map<Byte, Persister<Message>> protocols = new ConcurrentHashMap<>();
+
+
+   public static void registerProtocol(ProtocolManagerFactory manager) {
+      Persister<Message> messagePersister = manager.getPersister();
+      if (messagePersister == null) {
+         logger.warn("Cannot find persister for " + manager);
+      } else {
+         registerPersister(manager.getStoreID(), manager.getPersister());
+      }
+   }
+
+   public static void clearPersisters() {
+      protocols.clear();
+   }
+
+   public static void registerPersister(byte recordType, Persister<Message> persister) {
+      protocols.put(recordType, persister);
+   }
+
+   public static MessagePersister getInstance() {
+      return theInstance;
+   }
+
+
+   protected MessagePersister() {
+   }
+
+   protected byte getID() {
+      return (byte)0;
+   }
+
+   @Override
+   public int getEncodeSize(Message record) {
+      return 0;
+   }
+
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      buffer.writeByte(getID());
+   }
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message record) {
+      byte protocol = buffer.readByte();
+      Persister<Message> persister = protocols.get(protocol);
+      if (persister == null) {
+         throw new NullPointerException("couldn't find factory for type=" + protocol);
+      }
+      return persister.decode(buffer, record);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index 890fbfe..e29d74d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -22,12 +22,14 @@ import java.util.Map;
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
+/**
+ * Info: ProtocolManager is loaded by {@link org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl#loadProtocolManagerFactories(Iterable)} */
 public interface ProtocolManager<P extends BaseInterceptor> {
 
    ProtocolManagerFactory<P> getFactory();
@@ -51,14 +53,6 @@ public interface ProtocolManager<P extends BaseInterceptor> {
    boolean isProtocol(byte[] array);
 
    /**
-    * Gets the Message Converter towards ActiveMQ Artemis.
-    * Notice this being null means no need to convert
-    *
-    * @return
-    */
-   MessageConverter getConverter();
-
-   /**
     * If this protocols accepts connectoins without an initial handshake.
     * If true this protocol will be the failback case no other connections are made.
     * New designed protocols should always require a handshake. This is only useful for legacy protocols.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index d3b1b2e..9574540 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -20,10 +20,25 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 
 public interface ProtocolManagerFactory<P extends BaseInterceptor> {
 
+   /** This is to be used to store the protocol-id on Messages.
+    *  Messages are stored on their bare format.
+    *  The protocol manager will be responsible to code or decode messages.
+    *  The caveat here is that the first short-sized bytes need to be this constant. */
+   default byte getStoreID() {
+      return (byte)0;
+   }
+
+   default Persister<Message> getPersister() {
+      return null;
+   }
+
+
    /**
     * When you create the ProtocolManager, you should filter out any interceptors that won't belong
     * to this Protocol.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ee236c7..799e8b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.artemis.spi.core.protocol;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public interface SessionCallback {
@@ -55,10 +55,10 @@ public interface SessionCallback {
    //       and I wanted to avoid re-fetching paged data in case of GCs on this specific case.
    //
    //       Future developments may change this, but beware why I have chosen to keep the parameter separated here
-   int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumerID, int deliveryCount);
+   int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
 
    int sendLargeMessage(MessageReference reference,
-                        ServerMessage message,
+                        Message message,
                         ServerConsumer consumerID,
                         long bodySize,
                         int deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 0c33a35..6fdef44 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -682,22 +682,6 @@
             </xsd:annotation>
          </xsd:element>
 
-         <xsd:element name="perf-blast-pages" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
-            <xsd:annotation>
-               <xsd:documentation>
-                  XXX Only meant to be used by project developers
-               </xsd:documentation>
-            </xsd:annotation>
-         </xsd:element>
-
-         <xsd:element name="run-sync-speed-test" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
-            <xsd:annotation>
-               <xsd:documentation>
-                  XXX Only meant to be used by project developers
-               </xsd:documentation>
-            </xsd:annotation>
-         </xsd:element>
-
          <xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>


Mime
View raw message