activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [10/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Mon, 06 Mar 2017 11:53:57 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index d09f62f..92cae64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -25,9 +25,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
@@ -81,9 +84,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.jboss.logging.Logger;
@@ -137,13 +139,23 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    private volatile CoreRemotingConnection remotingConnection;
 
+   private final CoreProtocolManager manager;
+
+   // The current currentLargeMessage being processed
+   private volatile LargeServerMessage currentLargeMessage;
+
    private final boolean direct;
 
-   public ServerSessionPacketHandler(final ServerSession session,
+   public ServerSessionPacketHandler(final CoreProtocolManager manager,
+                                     final ServerSession session,
                                      final StorageManager storageManager,
                                      final Channel channel) {
+      this.manager = manager;
+
       this.session = session;
 
+      session.addCloseable((boolean failed) -> clearLargeMessage());
+
       this.storageManager = storageManager;
 
       this.channel = channel;
@@ -159,6 +171,16 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       }
    }
 
+   private void clearLargeMessage() {
+      if (currentLargeMessage != null) {
+         try {
+            currentLargeMessage.deleteFile();
+         } catch (Throwable error) {
+            ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+         }
+      }
+   }
+
    public ServerSession getSession() {
       return session;
    }
@@ -469,7 +491,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_SEND: {
                   SessionSendMessage message = (SessionSendMessage) packet;
                   requiresResponse = message.isRequiresResponse();
-                  session.send((ServerMessage) message.getMessage(), direct);
+                  session.send(message.getMessage(), direct);
                   if (requiresResponse) {
                      response = new NullResponseMessage();
                   }
@@ -477,13 +499,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                }
                case SESS_SEND_LARGE: {
                   SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
-                  session.sendLarge(message.getLargeMessage());
+                  sendLarge(message.getLargeMessage());
                   break;
                }
                case SESS_SEND_CONTINUATION: {
                   SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
                   requiresResponse = message.isRequiresResponse();
-                  session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
+                  sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
                   if (requiresResponse) {
                      response = new NullResponseMessage();
                   }
@@ -681,4 +703,53 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
       return serverLastReceivedCommandID;
    }
+
+   // Large Message is part of the core protocol, we have these functions here as part of Packet handler
+   private void sendLarge(final Message message) throws Exception {
+      // need to create the LargeMessage before continue
+      long id = storageManager.generateID();
+
+      LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("sendLarge::" + largeMsg);
+      }
+
+      if (currentLargeMessage != null) {
+         ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
+      }
+
+      currentLargeMessage = largeMsg;
+   }
+
+
+
+   private void sendContinuations(final int packetSize,
+                                 final long messageBodySize,
+                                 final byte[] body,
+                                 final boolean continues) throws Exception {
+      if (currentLargeMessage == null) {
+         throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
+      }
+
+      // Immediately release the credits for the continuations- these don't contribute to the in-memory size
+      // of the message
+
+      currentLargeMessage.addBytes(body);
+
+      if (!continues) {
+         currentLargeMessage.releaseResources();
+
+         if (messageBodySize >= 0) {
+            currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
+         }
+
+
+         session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false, false);
+
+         currentLargeMessage = null;
+      }
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index e6595a5..919d84e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -168,7 +168,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
 
          ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
 
-         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
+         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager, session, server.getStorageManager(), channel);
          channel.setHandler(handler);
 
          // TODO - where is this removed?

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index ffaf2cb..cc81fbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -53,9 +54,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -111,16 +110,6 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
       return false;
    }
 
-   /**
-    * no need to implement this now
-    *
-    * @return
-    */
-   @Override
-   public MessageConverter getConverter() {
-      return null;
-   }
-
    @Override
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
       final Configuration config = server.getConfiguration();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
index 7fed534..7560917 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -21,7 +21,10 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -29,10 +32,21 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
 
 public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
 
+   public static final byte ID = 1;
    private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
 
    private static final String MODULE_NAME = "artemis-server";
 
+   @Override
+   public byte getStoreID() {
+      return ID;
+   }
+
+   @Override
+   public Persister<Message> getPersister() {
+      return CoreMessagePersister.getInstance();
+   }
+
    /**
     * {@inheritDoc} *
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index a6c73eb..542d726 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -28,7 +29,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -66,7 +66,7 @@ public final class CoreSessionCallback implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference ref,
-                               ServerMessage message,
+                               Message message,
                                ServerConsumer consumer,
                                long bodySize,
                                int deliveryCount) {
@@ -92,8 +92,9 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
-      Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount);
+   public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount)  {
+
+      Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
 
       int size = 0;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
index 89d2863..8d22fab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
 
@@ -36,7 +36,9 @@ public final class ReplicationAddMessage extends PacketImpl {
 
    private byte journalRecordType;
 
-   private EncodingSupport encodingData;
+   private Persister persister;
+
+   private Object encodingData;
 
    private byte[] recordData;
 
@@ -48,12 +50,14 @@ public final class ReplicationAddMessage extends PacketImpl {
                                 final ADD_OPERATION_TYPE operation,
                                 final long id,
                                 final byte journalRecordType,
-                                final EncodingSupport encodingData) {
+                                final Persister persister,
+                                final Object encodingData) {
       this();
       this.journalID = journalID;
       this.operation = operation;
       this.id = id;
       this.journalRecordType = journalRecordType;
+      this.persister = persister;
       this.encodingData = encodingData;
    }
 
@@ -66,8 +70,8 @@ public final class ReplicationAddMessage extends PacketImpl {
       buffer.writeBoolean(operation.toBoolean());
       buffer.writeLong(id);
       buffer.writeByte(journalRecordType);
-      buffer.writeInt(encodingData.getEncodeSize());
-      encodingData.encode(buffer);
+      buffer.writeInt(persister.getEncodeSize(encodingData));
+      persister.encode(buffer, encodingData);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
index 59475e0..a6fd02b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
 
@@ -36,7 +36,9 @@ public class ReplicationAddTXMessage extends PacketImpl {
 
    private byte recordType;
 
-   private EncodingSupport encodingData;
+   private Persister persister;
+
+   private Object encodingData;
 
    private byte[] recordData;
 
@@ -51,7 +53,8 @@ public class ReplicationAddTXMessage extends PacketImpl {
                                   final long txId,
                                   final long id,
                                   final byte recordType,
-                                  final EncodingSupport encodingData) {
+                                  final Persister persister,
+                                  final Object encodingData) {
       this();
       this.journalID = journalID;
       this.operation = operation;
@@ -59,6 +62,7 @@ public class ReplicationAddTXMessage extends PacketImpl {
       this.id = id;
       this.recordType = recordType;
       this.encodingData = encodingData;
+      this.persister = persister;
    }
 
    // Public --------------------------------------------------------
@@ -70,8 +74,8 @@ public class ReplicationAddTXMessage extends PacketImpl {
       buffer.writeLong(txId);
       buffer.writeLong(id);
       buffer.writeByte(recordType);
-      buffer.writeInt(encodingData.getEncodeSize());
-      encodingData.encode(buffer);
+      buffer.writeInt(persister.getEncodeSize(encodingData));
+      persister.encode(buffer, encodingData);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
index 7307151..b88e0fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
@@ -48,7 +48,7 @@ public class ReplicationPageWriteMessage extends PacketImpl {
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       pageNumber = buffer.readInt();
-      pagedMessage = new PagedMessageImpl();
+      pagedMessage = new PagedMessageImpl(null);
       pagedMessage.decode(buffer);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index ea3107c..c5318e7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -17,12 +17,14 @@
 package org.apache.activemq.artemis.core.remoting.server;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -65,6 +67,8 @@ public interface RemotingService {
 
    boolean isStarted();
 
+   Map<String, ProtocolManagerFactory> getProtocolFactoryMap();
+
    /**
     * Allow acceptors to use this as their default security Principal if applicable.
     * <p>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 50bc90d..3e15f3e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -147,7 +148,9 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       this.scheduledThreadPool = scheduledThreadPool;
 
       CoreProtocolManagerFactory coreProtocolManagerFactory = new CoreProtocolManagerFactory();
-      //i know there is only 1
+
+      MessagePersister.getInstance().registerProtocol(coreProtocolManagerFactory);
+
       this.flushExecutor = flushExecutor;
 
       ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
@@ -174,6 +177,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
    }
 
    @Override
+   public Map<String, ProtocolManagerFactory> getProtocolFactoryMap() {
+      return protocolMap;
+   }
+
+   @Override
    public synchronized void start() throws Exception {
       if (started) {
          return;
@@ -768,6 +776,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
     */
    private void loadProtocolManagerFactories(Iterable<ProtocolManagerFactory> protocolManagerFactoryCollection) {
       for (ProtocolManagerFactory next : protocolManagerFactoryCollection) {
+         MessagePersister.registerProtocol(next);
          String[] protocols = next.getProtocols();
          for (String protocol : protocols) {
             ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index d70316f..0731e8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -88,13 +89,14 @@ public class ReplicatedJournal implements Journal {
    @Override
    public void appendAddRecord(final long id,
                                final byte recordType,
-                               final EncodingSupport record,
+                               Persister persister,
+                               final Object record,
                                final boolean sync) throws Exception {
       if (ReplicatedJournal.trace) {
          ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record);
-      localJournal.appendAddRecord(id, recordType, record, sync);
+      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
+      localJournal.appendAddRecord(id, recordType, persister, record, sync);
    }
 
    /**
@@ -108,14 +110,15 @@ public class ReplicatedJournal implements Journal {
    @Override
    public void appendAddRecord(final long id,
                                final byte recordType,
-                               final EncodingSupport record,
+                               Persister persister,
+                               final Object record,
                                final boolean sync,
                                final IOCompletion completionCallback) throws Exception {
       if (ReplicatedJournal.trace) {
          ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record);
-      localJournal.appendAddRecord(id, recordType, record, sync, completionCallback);
+      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
+      localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
    }
 
    /**
@@ -146,12 +149,13 @@ public class ReplicatedJournal implements Journal {
    public void appendAddRecordTransactional(final long txID,
                                             final long id,
                                             final byte recordType,
-                                            final EncodingSupport record) throws Exception {
+                                            final Persister persister,
+                                            final Object record) throws Exception {
       if (ReplicatedJournal.trace) {
          ReplicatedJournal.trace("Append record TXid = " + id + " recordType = " + recordType);
       }
-      replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, record);
-      localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+      replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
+      localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record);
    }
 
    /**
@@ -354,26 +358,28 @@ public class ReplicatedJournal implements Journal {
    @Override
    public void appendUpdateRecord(final long id,
                                   final byte recordType,
-                                  final EncodingSupport record,
+                                  final Persister persister,
+                                  final Object record,
                                   final boolean sync) throws Exception {
       if (ReplicatedJournal.trace) {
          ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, record);
-      localJournal.appendUpdateRecord(id, recordType, record, sync);
+      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
+      localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
    }
 
    @Override
    public void appendUpdateRecord(final long id,
                                   final byte journalRecordType,
-                                  final EncodingSupport record,
+                                  final Persister persister,
+                                  final Object record,
                                   final boolean sync,
                                   final IOCompletion completionCallback) throws Exception {
       if (ReplicatedJournal.trace) {
          ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
       }
-      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, record);
-      localJournal.appendUpdateRecord(id, journalRecordType, record, sync, completionCallback);
+      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
+      localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
    }
 
    /**
@@ -404,12 +410,13 @@ public class ReplicatedJournal implements Journal {
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
-                                               final EncodingSupport record) throws Exception {
+                                               final Persister persister,
+                                               final Object record) throws Exception {
       if (ReplicatedJournal.trace) {
          ReplicatedJournal.trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
       }
-      replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, record);
-      localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+      replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
+      localJournal.appendUpdateRecordTransactional(txID, id, recordType, persister, record);
    }
 
    /**
@@ -437,15 +444,6 @@ public class ReplicatedJournal implements Journal {
    }
 
    /**
-    * @param pages
-    * @see org.apache.activemq.artemis.core.journal.Journal#perfBlast(int)
-    */
-   @Override
-   public void perfBlast(final int pages) {
-      localJournal.perfBlast(pages);
-   }
-
-   /**
     * @throws Exception
     * @see org.apache.activemq.artemis.core.server.ActiveMQComponent#start()
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1a07adc..e82d38e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -76,7 +77,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
@@ -651,8 +652,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
    private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception {
       PagedMessage pgdMessage = packet.getPagedMessage();
       pgdMessage.initMessage(storageManager);
-      ServerMessage msg = pgdMessage.getMessage();
-      Page page = getPage(msg.getAddress(), packet.getPageNumber());
+      Message msg = pgdMessage.getMessage();
+      Page page = getPage(msg.getAddressSimpleString(), packet.getPageNumber());
       page.write(pgdMessage);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d0468d1..dce5990 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -147,9 +148,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
                                   final ADD_OPERATION_TYPE operation,
                                   final long id,
                                   final byte recordType,
-                                  final EncodingSupport record) throws Exception {
+                                  final Persister persister,
+                                  final Object record) throws Exception {
       if (enabled) {
-         sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, record));
+         sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, persister, record));
       }
    }
 
@@ -164,9 +166,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
                                             final long txID,
                                             final long id,
                                             final byte recordType,
-                                            final EncodingSupport record) throws Exception {
+                                            final Persister persister,
+                                            final Object record) throws Exception {
       if (enabled) {
-         sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, record));
+         sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record));
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6ee844b..b79806a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -577,12 +577,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void ioErrorAddingReferences(Integer errorCode, String errorMessage);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222058, value = "Duplicate message detected through the bridge - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT)
-   void duplicateMessageDetectedThruBridge(ServerMessage message);
-
-   @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222059, value = "Duplicate message detected - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT)
-   void duplicateMessageDetected(ServerMessage message);
+   void duplicateMessageDetected(org.apache.activemq.artemis.api.core.Message message);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222060, value = "Error while confirming large message completion on rollback for recordID={0}", format = Message.Format.MESSAGE_FORMAT)
@@ -783,7 +779,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222110, value = "no queue IDs defined!,  originalMessage  = {0}, copiedMessage = {1}, props={2}",
       format = Message.Format.MESSAGE_FORMAT)
-   void noQueueIdDefined(ServerMessage message, ServerMessage messageCopy, SimpleString idsHeaderName);
+   void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString idsHeaderName);
 
    @LogMessage(level = Logger.Level.TRACE)
    @Message(id = 222111, value = "exception while invoking {0} on {1}",

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
index 0e38634..1ede0ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java
@@ -16,9 +16,11 @@
  */
 package org.apache.activemq.artemis.core.server;
 
+import org.apache.activemq.artemis.api.core.Message;
+
 public interface Bindable {
 
-   void route(ServerMessage message, RoutingContext context) throws Exception;
+   void route(Message message, RoutingContext context) throws Exception;
 
-   void routeWithAck(ServerMessage message, RoutingContext context) throws Exception;
+   void routeWithAck(Message message, RoutingContext context) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 2a16ed2..6fcc802 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -17,10 +17,11 @@
 package org.apache.activemq.artemis.core.server;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
 
-public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage {
+public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage {
 
    @Override
    void addBytes(byte[] bytes) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index a1e6a20..799b0b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.artemis.core.server;
 
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
 /**
@@ -26,9 +29,14 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  */
 public interface MessageReference {
 
+   final class Factory {
+      public static MessageReference createReference(Message encode, final Queue queue) {
+         return new MessageReferenceImpl(encode, queue);
+      }
+   }
    boolean isPaged();
 
-   ServerMessage getMessage();
+   Message getMessage();
 
    /**
     * We define this method aggregation here because on paging we need to hold the original estimate,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ae377bb..d7b70a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -197,7 +198,7 @@ public interface Queue extends Bindable {
 
    void cancelRedistributor() throws Exception;
 
-   boolean hasMatchingConsumer(ServerMessage message);
+   boolean hasMatchingConsumer(Message message);
 
    Collection<Consumer> getConsumers();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
deleted file mode 100644
index 40dc50f..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-
-/**
- * A ServerMessage
- */
-public interface ServerMessage extends MessageInternal, EncodingSupport {
-
-   ServerMessage setMessageID(long id);
-
-   MessageReference createReference(Queue queue);
-
-   /**
-    * This will force encoding of the address, and will re-check the buffer
-    * This is to avoid setMessageTransient which set the address without changing the buffer
-    *
-    * @param address
-    */
-   void forceAddress(SimpleString address);
-
-   int incrementRefCount() throws Exception;
-
-   int decrementRefCount() throws Exception;
-
-   int incrementDurableRefCount();
-
-   int decrementDurableRefCount();
-
-   ServerMessage copy(long newID);
-
-   ServerMessage copy();
-
-   int getMemoryEstimate();
-
-   int getRefCount();
-
-   ServerMessage makeCopyForExpiryOrDLA(long newID,
-                                        MessageReference originalReference,
-                                        boolean expiry,
-                                        boolean copyOriginalHeaders) throws Exception;
-
-   void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry);
-
-   void setPagingStore(PagingStore store);
-
-   PagingStore getPagingStore();
-
-   // Is there any _AMQ_ property being used
-   boolean hasInternalProperties();
-
-   boolean storeIsPaging();
-
-   void encodeMessageIDToBuffer();
-
-   byte[] getDuplicateIDBytes();
-
-   Object getDuplicateProperty();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index f4e2ec7..0ce0728 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -21,10 +21,11 @@ import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.activemq.artemis.Closeable;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
@@ -99,6 +100,8 @@ public interface ServerSession extends SecurityAuth {
 
    void stop();
 
+   void addCloseable(Closeable closeable);
+
    /**
     * To be used by protocol heads that needs to control the transaction outside the session context.
     */
@@ -178,18 +181,20 @@ public interface ServerSession extends SecurityAuth {
 
    void receiveConsumerCredits(long consumerID, int credits) throws Exception;
 
-   void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
-
    RoutingStatus send(Transaction tx,
-                      ServerMessage message,
+                      Message message,
                       boolean direct,
                       boolean noAutoCreateQueue) throws Exception;
 
-   RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
+   RoutingStatus doSend(final Transaction tx,
+                        final Message msg,
+                        final SimpleString originalAddress,
+                        final boolean direct,
+                        final boolean noAutoCreateQueue) throws Exception;
 
-   RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
+   RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
 
-   void sendLarge(MessageInternal msg) throws Exception;
+   RoutingStatus send(Message message, boolean direct) throws Exception;
 
    void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
 
@@ -249,7 +254,9 @@ public interface ServerSession extends SecurityAuth {
 
    SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
 
-   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+   SimpleString getMatchingQueue(SimpleString address,
+                                 SimpleString queueName,
+                                 RoutingType routingType) throws Exception;
 
    AddressInfo getAddress(SimpleString address);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
index 1583f2c..48f4aa9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.artemis.core.server.cluster;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
 
 public interface Transformer {
 
-   ServerMessage transform(ServerMessage message);
+   Message transform(Message message);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index ee549c5..fe43532 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -46,14 +46,12 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -499,16 +497,16 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    /* Hook for processing message before forwarding */
-   protected ServerMessage beforeForward(final ServerMessage message) {
+   protected Message beforeForward(final Message message) {
       if (useDuplicateDetection) {
          // We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
          byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
 
-         message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
+         message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
       }
 
       if (transformer != null) {
-         final ServerMessage transformedMessage = transformer.transform(message);
+         final Message transformedMessage = transformer.transform(message);
          if (transformedMessage != message) {
             if (logger.isDebugEnabled()) {
                logger.debug("The transformer " + transformer +
@@ -556,7 +554,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             refs.put(ref.getMessage().getMessageID(), ref);
          }
 
-         final ServerMessage message = beforeForward(ref.getMessage());
+         final Message message = beforeForward(ref.getMessage());
 
          final SimpleString dest;
 
@@ -564,7 +562,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             dest = forwardingAddress;
          } else {
             // Preserve the original address
-            dest = message.getAddress();
+            dest = message.getAddressSimpleString();
          }
 
          pendingAcks.countUp();
@@ -686,7 +684,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
     * @param message
     * @return
     */
-   private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, ServerMessage message) {
+   private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, Message message) {
       // if we failover during send then there is a chance that the
       // that this will throw a disconnect, we need to remove the message
       // from the acks so it will get resent, duplicate detection will cope

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index f16d863..a870ea6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -36,12 +37,10 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
@@ -113,7 +112,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
 
       this.discoveryLocator = discoveryLocator;
 
-      idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+      idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(name);
 
       this.clusterConnection = clusterConnection;
 
@@ -150,13 +149,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
    }
 
    @Override
-   protected ServerMessage beforeForward(final ServerMessage message) {
+   protected Message beforeForward(final Message message) {
       // We make a copy of the message, then we strip out the unwanted routing id headers and leave
       // only
       // the one pertinent for the address node - this is important since different queues on different
       // nodes could have same queue ids
       // Note we must copy since same message may get routed to other nodes which require different headers
-      ServerMessage messageCopy = message.copy();
+      Message messageCopy = message.copy();
 
       if (logger.isTraceEnabled()) {
          logger.trace("Clustered bridge  copied message " + message + " as " + messageCopy + " before delivery");
@@ -175,12 +174,12 @@ public class ClusterConnectionBridge extends BridgeImpl {
       }
 
       for (SimpleString propName : propNames) {
-         if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
+         if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
             messageCopy.removeProperty(propName);
          }
       }
 
-      messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+      messageCopy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
 
       messageCopy = super.beforeForward(messageCopy);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index c585405..e9477a8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -139,7 +138,7 @@ public class Redistributor implements Consumer {
 
       final Transaction tx = new TransactionImpl(storageManager);
 
-      final Pair<RoutingContext, ServerMessage> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
+      final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
 
       if (routingInfo == null) {
          return HandleStatus.BUSY;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 8f54b2a..02a7671 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -23,15 +23,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.jboss.logging.Logger;
 
@@ -88,7 +87,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
 
       queueFilter = FilterImpl.createFilter(filterString);
 
-      idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
+      idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(bridgeName);
 
       this.distance = distance;
    }
@@ -149,7 +148,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
    }
 
    @Override
-   public synchronized boolean isHighAcceptPriority(final ServerMessage message) {
+   public synchronized boolean isHighAcceptPriority(final Message message) {
       if (consumerCount == 0) {
          return false;
       }
@@ -172,7 +171,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) {
+   public void route(final Message message, final RoutingContext context) {
       addRouteContextToMessage(message);
 
       List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
@@ -185,7 +184,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
    }
 
    @Override
-   public void routeWithAck(ServerMessage message, RoutingContext context) {
+   public void routeWithAck(Message message, RoutingContext context) {
       addRouteContextToMessage(message);
 
       List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
@@ -315,7 +314,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
     *
     * @param message
     */
-   private void addRouteContextToMessage(final ServerMessage message) {
+   private void addRouteContextToMessage(final Message message) {
       byte[] ids = message.getBytesProperty(idsHeaderName);
 
       if (ids == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index aa1ebf3..2b5ecaf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -127,7 +128,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.ServiceComponent;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 619036d..5b0d406 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -25,7 +25,6 @@ import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.jboss.logging.Logger;
 
@@ -83,7 +82,7 @@ public class DivertImpl implements Divert {
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+   public void route(final Message message, final RoutingContext context) throws Exception {
       // We must make a copy of the message, otherwise things like returning credits to the page won't work
       // properly on ack, since the original address will be overwritten
 
@@ -91,7 +90,7 @@ public class DivertImpl implements Divert {
          logger.trace("Diverting message " + message + " into " + this);
       }
 
-      ServerMessage copy = null;
+      Message copy = null;
 
       // Shouldn't copy if it's not routed anywhere else
       if (!forwardAddress.equals(context.getAddress())) {
@@ -99,7 +98,7 @@ public class DivertImpl implements Divert {
          copy = message.copy(id);
 
          // This will set the original MessageId, and the original address
-         copy.setOriginalHeaders(message, null, false);
+         copy.referenceOriginalMessage(message, null);
 
          copy.setAddress(forwardAddress);
 
@@ -130,7 +129,7 @@ public class DivertImpl implements Divert {
    }
 
    @Override
-   public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
+   public void routeWithAck(Message message, RoutingContext context) throws Exception {
       route(message, context);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
index 40cef50..4adb1b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java
@@ -20,6 +20,7 @@ import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.journal.Journal;
@@ -29,7 +30,6 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
 import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -43,7 +43,7 @@ public interface JournalLoader {
 
    void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception;
 
-   void handleNoMessageReferences(Map<Long, ServerMessage> messages);
+   void handleNoMessageReferences(Map<Long, Message> messages);
 
    void handleGroupingBindings(List<GroupingInfo> groupingInfos);
 
@@ -53,7 +53,7 @@ public interface JournalLoader {
                  ResourceManager resourceManager,
                  Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
 
-   void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception;
+   void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception;
 
    void handlePreparedAcknowledge(long messageID,
                                   List<MessageReference> referencesToAck,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index eb467ae..d059d2c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -31,7 +31,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -74,7 +73,7 @@ public class LastValueQueue extends QueueImpl {
          return;
       }
 
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
 
       if (prop != null) {
          HolderReference hr = map.get(prop);
@@ -98,7 +97,7 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addHead(final MessageReference ref, boolean scheduling) {
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      SimpleString prop = ref.getMessage().getDeliveryAnnotationPropertyString(Message.HDR_LAST_VALUE_NAME);
 
       if (prop != null) {
          HolderReference hr = map.get(prop);
@@ -148,7 +147,7 @@ public class LastValueQueue extends QueueImpl {
    @Override
    protected void refRemoved(MessageReference ref) {
       synchronized (this) {
-         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString());
 
          if (prop != null) {
             map.remove(prop);
@@ -223,7 +222,7 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
-      public ServerMessage getMessage() {
+      public Message getMessage() {
          return ref.getMessage();
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 6d9030e..bffb1ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -18,11 +18,10 @@ package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.MemorySize;
 
 /**
  * Implementation of a MessageReference
@@ -35,7 +34,7 @@ public class MessageReferenceImpl implements MessageReference {
 
    private volatile long scheduledDeliveryTime;
 
-   private final ServerMessage message;
+   private final Message message;
 
    private final Queue queue;
 
@@ -47,20 +46,7 @@ public class MessageReferenceImpl implements MessageReference {
 
    // Static --------------------------------------------------------
 
-   private static final int memoryOffset;
-
-   static {
-      // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
-      // Note, it is only an estimate, it's not possible to be entirely sure with Java
-      // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
-      // The value is somewhat higher on 64 bit architectures, probably due to different alignment
-
-      if (MemorySize.is64bitArch()) {
-         memoryOffset = 48;
-      } else {
-         memoryOffset = 32;
-      }
-   }
+   private static final int memoryOffset = 64;
 
    // Constructors --------------------------------------------------
 
@@ -80,7 +66,7 @@ public class MessageReferenceImpl implements MessageReference {
       this.queue = queue;
    }
 
-   protected MessageReferenceImpl(final ServerMessage message, final Queue queue) {
+   public MessageReferenceImpl(final Message message, final Queue queue) {
       this.message = message;
 
       this.queue = queue;
@@ -155,7 +141,7 @@ public class MessageReferenceImpl implements MessageReference {
    }
 
    @Override
-   public ServerMessage getMessage() {
+   public Message getMessage() {
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 005a994..717e2e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -233,8 +232,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
    }
 
    @Override
-   public void handleNoMessageReferences(Map<Long, ServerMessage> messages) {
-      for (ServerMessage msg : messages.values()) {
+   public void handleNoMessageReferences(Map<Long, Message> messages) {
+      for (Message msg : messages.values()) {
          if (msg.getRefCount() == 0) {
             ActiveMQServerLogger.LOGGER.journalUnreferencedMessage(msg.getMessageID());
             try {
@@ -284,7 +283,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
    }
 
    @Override
-   public void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception {
+   public void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception {
       Queue queue = queues.get(queueID);
 
       if (queue == null) {


Mime
View raw message