activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [15/36] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Fri, 03 Mar 2017 01:05:08 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 548b62c..b997d80 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -17,10 +17,12 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
@@ -43,11 +45,11 @@ public class MQTTSessionCallback implements SessionCallback {
 
    @Override
    public int sendMessage(MessageReference reference,
-                          ServerMessage message,
+                          Message message,
                           ServerConsumer consumer,
                           int deliveryCount) {
       try {
-         session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
+         session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
       } catch (Exception e) {
          log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
       }
@@ -70,7 +72,7 @@ public class MQTTSessionCallback implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference reference,
-                               ServerMessage message,
+                               Message message,
                                ServerConsumer consumer,
                                long bodySize,
                                int deliveryCount) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 7bc6b84..6891497 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -28,8 +28,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 /**
  * A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis.
@@ -93,13 +92,13 @@ public class MQTTUtil {
       return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration);
    }
 
-   private static ServerMessage createServerMessage(MQTTSession session,
+   private static Message createServerMessage(MQTTSession session,
                                                     SimpleString address,
                                                     boolean retain,
                                                     int qos) {
       long id = session.getServer().getStorageManager().generateID();
 
-      ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+      CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
       message.setAddress(address);
       message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
       message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
@@ -107,21 +106,21 @@ public class MQTTUtil {
       return message;
    }
 
-   public static ServerMessage createServerMessageFromByteBuf(MQTTSession session,
+   public static Message createServerMessageFromByteBuf(MQTTSession session,
                                                               String topic,
                                                               boolean retain,
                                                               int qos,
                                                               ByteBuf payload) {
       String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
-      ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+      Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
 
       // FIXME does this involve a copy?
       message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
       return message;
    }
 
-   public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
-      ServerMessage message = createServerMessage(session, address, false, 1);
+   public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
+      Message message = createServerMessage(session, address, false, 1);
       message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
       message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 9b27b81..550a63a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -36,11 +36,10 @@ import java.util.zip.InflaterOutputStream;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -102,16 +101,16 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    @Override
-   public Object outbound(ServerMessage message, int deliveryCount) {
+   public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
       // TODO: implement this
       return null;
    }
 
    @Override
-   public ServerMessage inbound(Object message) throws Exception {
+   public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
 
       Message messageSend = (Message) message;
-      ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize());
 
       String type = messageSend.getType();
       if (type != null) {
@@ -157,7 +156,7 @@ public class OpenWireMessageConverter implements MessageConverter {
                mdataIn.close();
                TypedProperties props = new TypedProperties();
                loadMapIntoProperties(props, map);
-               props.encode(body);
+               props.encode(body.byteBuf());
                break;
             case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
                if (messageCompressed) {
@@ -415,8 +414,9 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    public static MessageDispatch createMessageDispatch(MessageReference reference,
-                                                       ServerMessage message,
+                                                       org.apache.activemq.artemis.api.core.Message message,
                                                        AMQConsumer consumer) throws IOException, JMSException {
+      // TODO-now: use new Encode here
       ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
 
       //we can use core message id for sequenceId
@@ -433,7 +433,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    private static ActiveMQMessage toAMQMessage(MessageReference reference,
-                                               ServerMessage coreMessage,
+                                               org.apache.activemq.artemis.api.core.Message coreMessage,
                                                WireFormat marshaller,
                                                ActiveMQDestination actualDestination) throws IOException {
       ActiveMQMessage amqMsg = null;
@@ -476,7 +476,7 @@ public class OpenWireMessageConverter implements MessageConverter {
       }
       amqMsg.setBrokerInTime(brokerInTime);
 
-      ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate();
+      ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
       Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
       boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
       amqMsg.setCompressed(isCompressed);
@@ -503,7 +503,7 @@ public class OpenWireMessageConverter implements MessageConverter {
                TypedProperties mapData = new TypedProperties();
                //it could be a null map
                if (buffer.readableBytes() > 0) {
-                  mapData.decode(buffer);
+                  mapData.decode(buffer.byteBuf());
                   Map<String, Object> map = mapData.getMap();
                   ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
                   OutputStream os = out;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index f471a2a..6f83c2d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
@@ -35,7 +36,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -208,7 +208,7 @@ public class AMQConsumer {
 
    }
 
-   public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) {
+   public int handleDeliver(MessageReference reference, Message message, int deliveryCount) {
       MessageDispatch dispatch;
       try {
          if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
@@ -394,7 +394,7 @@ public class AMQConsumer {
          }
       }
 
-      public boolean checkForcedConsumer(ServerMessage message) {
+      public boolean checkForcedConsumer(Message message) {
          if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
             if (next >= 0) {
                if (timeout <= 0) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 79004ae..1b7ed43 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.reader.MessageUtil;
@@ -231,7 +230,7 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public int sendMessage(MessageReference reference,
-                          ServerMessage message,
+                          org.apache.activemq.artemis.api.core.Message message,
                           ServerConsumer consumer,
                           int deliveryCount) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@@ -240,7 +239,7 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference reference,
-                               ServerMessage message,
+                               org.apache.activemq.artemis.api.core.Message message,
                                ServerConsumer consumerID,
                                long bodySize,
                                int deliveryCount) {
@@ -296,7 +295,7 @@ public class AMQSession implements SessionCallback {
          actualDestinations = new ActiveMQDestination[]{destination};
       }
 
-      ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
+      org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
 
       if (connection.isNoLocal()) {
          //Note: advisory messages are dealt with in
@@ -324,7 +323,7 @@ public class AMQSession implements SessionCallback {
       for (int i = 0; i < actualDestinations.length; i++) {
          ActiveMQDestination dest = actualDestinations[i];
          SimpleString address = new SimpleString(dest.getPhysicalName());
-         ServerMessage coreMsg = originalCoreMsg.copy();
+         org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
          coreMsg.setAddress(address);
 
          if (actualDestinations[i].isQueue()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 5355c63..2686907 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,8 +18,9 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -53,8 +54,8 @@ public class OpenWireUtil {
     * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
     * consumer
     */
-   public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
-      String address = message.getAddress().toString();
+   public static ActiveMQDestination toAMQAddress(Message message, ActiveMQDestination actualDestination) {
+      String address = message.getAddress();
       String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
       if (actualDestination.isQueue()) {
          return new ActiveMQQueue(strippedAddress);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index 861c524..d377abd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp;
 
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.jboss.logging.Messages;
 import org.jboss.logging.annotations.Cause;
 import org.jboss.logging.annotations.Message;
@@ -71,7 +70,7 @@ public interface ActiveMQStompProtocolMessageBundle {
    ActiveMQStompException invalidConnection();
 
    @Message(id = 339011, value = "Error sending message {0}", format = Message.Format.MESSAGE_FORMAT)
-   ActiveMQStompException errorSendMessage(ServerMessageImpl message, @Cause Exception e);
+   ActiveMQStompException errorSendMessage(org.apache.activemq.artemis.api.core.Message message, @Cause Exception e);
 
    @Message(id = 339012, value = "Error beginning a transaction {0}", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQStompException errorBeginTx(String txID, @Cause Exception e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index c004a0e..c64c1ea 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -30,18 +30,18 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -569,7 +569,7 @@ public final class StompConnection implements RemotingConnection {
       return valid;
    }
 
-   public ServerMessageImpl createServerMessage() {
+   public CoreMessage createServerMessage() {
       return manager.createServerMessage();
    }
 
@@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   protected void sendServerMessage(ServerMessageImpl message, String txID) throws ActiveMQStompException {
+   protected void sendServerMessage(Message message, String txID) throws ActiveMQStompException {
       StompSession stompSession = getSession(txID);
 
       if (stompSession.isNoLocal()) {
@@ -611,7 +611,7 @@ public final class StompConnection implements RemotingConnection {
          if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) {
             stompSession.sendInternal(message, false);
          } else {
-            stompSession.sendInternalLarge(message, false);
+            stompSession.sendInternalLarge((CoreMessage)message, false);
          }
       } catch (Exception e) {
          throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler);
@@ -726,7 +726,7 @@ public final class StompConnection implements RemotingConnection {
       return SERVER_NAME;
    }
 
-   public StompFrame createStompMessage(ServerMessage serverMessage,
+   public StompFrame createStompMessage(Message serverMessage,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 54339a4..2be0be4 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -33,12 +33,12 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@@ -345,8 +345,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
       return validated;
    }
 
-   public ServerMessageImpl createServerMessage() {
-      return new ServerMessageImpl(server.getStorageManager().generateID(), 512);
+   public CoreMessage createServerMessage() {
+      return new CoreMessage(server.getStorageManager().generateID(), 512);
    }
 
    public void commitTransaction(StompConnection connection, String txID) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 1e103e9..d2d42b7 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -28,20 +28,18 @@ import java.util.zip.Inflater;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -127,11 +125,13 @@ public class StompSession implements SessionCallback {
 
    @Override
    public int sendMessage(MessageReference ref,
-                          ServerMessage serverMessage,
+                          Message serverMessage,
                           final ServerConsumer consumer,
                           int deliveryCount) {
+
+      //TODO-now: fix encoders
       LargeServerMessageImpl largeMessage = null;
-      ServerMessage newServerMessage = serverMessage;
+      Message newServerMessage = serverMessage;
       try {
          StompSubscription subscription = subscriptions.get(consumer.getID());
          StompFrame frame = null;
@@ -139,20 +139,23 @@ public class StompSession implements SessionCallback {
             newServerMessage = serverMessage.copy();
 
             largeMessage = (LargeServerMessageImpl) serverMessage;
-            BodyEncoder encoder = largeMessage.getBodyEncoder();
+            LargeBodyEncoder encoder = largeMessage.getBodyEncoder();
             encoder.open();
             int bodySize = (int) encoder.getLargeBodySize();
 
+            // TODO-now: Convert large mesasge body into the stomp message
             //large message doesn't have a body.
-            ((ServerMessageImpl) newServerMessage).createBody(bodySize);
-            encoder.encode(newServerMessage.getBodyBuffer(), bodySize);
-            encoder.close();
+            // ((Message) newServerMessage).createBody(bodySize);
+//            encoder.encode(((ServerMessage)newServerMessage).getBodyBuffer(), bodySize);
+//            encoder.close();
+
+            throw new RuntimeException("Large message body won't work with stomp now");
          }
 
          if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
             //decompress
             ActiveMQBuffer qbuff = newServerMessage.getBodyBuffer();
-            int bytesToRead = qbuff.writerIndex() - MessageImpl.BODY_OFFSET;
+            int bytesToRead = qbuff.writerIndex() - CoreMessage.BODY_OFFSET;
             Inflater inflater = new Inflater();
             inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
 
@@ -219,7 +222,7 @@ public class StompSession implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference ref,
-                               ServerMessage msg,
+                               Message msg,
                                ServerConsumer consumer,
                                long bodySize,
                                int deliveryCount) {
@@ -370,11 +373,11 @@ public class StompSession implements SessionCallback {
       this.noLocal = noLocal;
    }
 
-   public void sendInternal(ServerMessageImpl message, boolean direct) throws Exception {
+   public void sendInternal(Message message, boolean direct) throws Exception {
       session.send(message, direct);
    }
 
-   public void sendInternalLarge(ServerMessageImpl message, boolean direct) throws Exception {
+   public void sendInternalLarge(CoreMessage message, boolean direct) throws Exception {
       int headerSize = message.getHeadersAndPropertiesEncodeSize();
       if (headerSize >= connection.getMinLargeMessageSize()) {
          throw BUNDLE.headerTooBig();
@@ -384,7 +387,7 @@ public class StompSession implements SessionCallback {
       long id = storageManager.generateID();
       LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);
 
-      byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - MessageImpl.BODY_OFFSET];
+      byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
       message.getBodyBuffer().readBytes(bytes);
 
       largeMessage.addBytes(bytes);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index affab84..7db9d82 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -24,8 +24,6 @@ import java.util.Set;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.reader.MessageUtil;
 
 public class StompUtils {
@@ -37,7 +35,7 @@ public class StompUtils {
 
    // Static --------------------------------------------------------
 
-   public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception {
+   public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception {
       Map<String, String> headers = new HashMap<>(frame.getHeadersMap());
 
       String priority = headers.remove(Stomp.Headers.Send.PRIORITY);
@@ -79,7 +77,7 @@ public class StompUtils {
       }
    }
 
-   public static void copyStandardHeadersFromMessageToFrame(MessageInternal message,
+   public static void copyStandardHeadersFromMessageToFrame(Message message,
                                                             StompFrame command,
                                                             int deliveryCount) throws Exception {
       command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index f91ba82..8d13613 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -21,15 +21,13 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
 import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
@@ -180,7 +178,7 @@ public abstract class VersionedStompFrameHandler {
 
          long timestamp = System.currentTimeMillis();
 
-         ServerMessageImpl message = connection.createServerMessage();
+         CoreMessage message = connection.createServerMessage();
          if (routingType != null) {
             message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
          }
@@ -289,7 +287,7 @@ public abstract class VersionedStompFrameHandler {
       return response;
    }
 
-   public StompFrame createMessageFrame(ServerMessage serverMessage,
+   public StompFrame createMessageFrame(Message serverMessage,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
@@ -298,11 +296,12 @@ public abstract class VersionedStompFrameHandler {
          frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
       }
 
-      ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate();
+      // TODO-now fix encoders
+      ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
 
-      int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+      int bodyPos = ((CoreMessage)serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : ((CoreMessage)serverMessage).getEndOfBodyPosition();
 
-      buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+      buffer.readerIndex(CoreMessage.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
 
       int size = bodyPos - buffer.readerIndex();
 
@@ -321,7 +320,7 @@ public abstract class VersionedStompFrameHandler {
       }
       frame.setByteBody(data);
 
-      StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+      StompUtils.copyStandardHeadersFromMessageToFrame((serverMessage), frame, deliveryCount);
 
       return frame;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 6b211d2..b14605d 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@@ -27,7 +28,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompSubscription;
 import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -48,7 +48,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
    }
 
    @Override
-   public StompFrame createMessageFrame(ServerMessage serverMessage,
+   public StompFrame createMessageFrame(Message serverMessage,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7881470..30d6668 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -750,10 +750,6 @@ public interface Configuration {
 
    Configuration setLogJournalWriteRate(boolean rate);
 
-   int getJournalPerfBlastPages();
-
-   Configuration setJournalPerfBlastPages(int pages);
-
    long getServerDumpInterval();
 
    Configuration setServerDumpInterval(long interval);
@@ -766,10 +762,6 @@ public interface Configuration {
 
    Configuration setMemoryMeasureInterval(long memoryMeasureInterval);
 
-   boolean isRunSyncSpeedTest();
-
-   Configuration setRunSyncSpeedTest(boolean run);
-
    // Paging Properties --------------------------------------------------------------------
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index f4eda91..329f654 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -193,10 +193,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    protected boolean logJournalWriteRate = ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate();
 
-   protected int journalPerfBlastPages = ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages();
-
-   protected boolean runSyncSpeedTest = ActiveMQDefaultConfiguration.isDefaultRunSyncSpeedTest();
-
    private WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
 
    private boolean messageCounterEnabled = ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled();
@@ -854,28 +850,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
-   public int getJournalPerfBlastPages() {
-      return journalPerfBlastPages;
-   }
-
-   @Override
-   public ConfigurationImpl setJournalPerfBlastPages(final int journalPerfBlastPages) {
-      this.journalPerfBlastPages = journalPerfBlastPages;
-      return this;
-   }
-
-   @Override
-   public boolean isRunSyncSpeedTest() {
-      return runSyncSpeedTest;
-   }
-
-   @Override
-   public ConfigurationImpl setRunSyncSpeedTest(final boolean run) {
-      runSyncSpeedTest = run;
-      return this;
-   }
-
-   @Override
    public boolean isCreateBindingsDir() {
       return createBindingsDir;
    }
@@ -1556,7 +1530,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
       result = prime * result + journalMaxIO_AIO;
       result = prime * result + journalMaxIO_NIO;
       result = prime * result + journalMinFiles;
-      result = prime * result + journalPerfBlastPages;
       result = prime * result + (journalSyncNonTransactional ? 1231 : 1237);
       result = prime * result + (journalSyncTransactional ? 1231 : 1237);
       result = prime * result + ((journalType == null) ? 0 : journalType.hashCode());
@@ -1580,7 +1553,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
       result = prime * result + (persistIDCache ? 1231 : 1237);
       result = prime * result + (persistenceEnabled ? 1231 : 1237);
       result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode());
-      result = prime * result + (runSyncSpeedTest ? 1231 : 1237);
       result = prime * result + scheduledThreadPoolMaxSize;
       result = prime * result + (securityEnabled ? 1231 : 1237);
       result = prime * result + (populateValidatedUser ? 1231 : 1237);
@@ -1723,8 +1695,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
          return false;
       if (journalMinFiles != other.journalMinFiles)
          return false;
-      if (journalPerfBlastPages != other.journalPerfBlastPages)
-         return false;
       if (journalSyncNonTransactional != other.journalSyncNonTransactional)
          return false;
       if (journalSyncTransactional != other.journalSyncTransactional)
@@ -1793,8 +1763,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
             return false;
       } else if (!queueConfigurations.equals(other.queueConfigurations))
          return false;
-      if (runSyncSpeedTest != other.runSyncSpeedTest)
-         return false;
       if (scheduledThreadPoolMaxSize != other.scheduledThreadPoolMaxSize)
          return false;
       if (securityEnabled != other.securityEnabled)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index cea0598..4055b5c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -548,10 +548,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       config.setLogJournalWriteRate(getBoolean(e, "log-journal-write-rate", ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate()));
 
-      config.setJournalPerfBlastPages(getInteger(e, "perf-blast-pages", ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), Validators.MINUS_ONE_OR_GT_ZERO));
-
-      config.setRunSyncSpeedTest(getBoolean(e, "run-sync-speed-test", config.isRunSyncSpeedTest()));
-
       if (e.hasAttribute("wild-card-routing-enabled")) {
          config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled()));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
index 41d5e54..3737e19 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.artemis.core.filter;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public interface Filter {
 
@@ -31,7 +31,7 @@ public interface Filter {
     */
    String GENERIC_IGNORED_FILTER = "__AMQX=-1";
 
-   boolean match(ServerMessage message);
+   boolean match(Message message);
 
    SimpleString getFilterString();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index 0a459c9..9d321c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.filter.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.selector.filter.BooleanExpression;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.filter.Filterable;
@@ -103,7 +103,7 @@ public class FilterImpl implements Filter {
    }
 
    @Override
-   public synchronized boolean match(final ServerMessage message) {
+   public synchronized boolean match(final Message message) {
       try {
          boolean result = booleanExpression.matches(new FilterableServerMessage(message));
          return result;
@@ -148,7 +148,7 @@ public class FilterImpl implements Filter {
 
    // Private --------------------------------------------------------------------------
 
-   private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) {
+   private static Object getHeaderFieldValue(final Message msg, final SimpleString fieldName) {
       if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) {
          if (msg.getUserID() == null) {
             // Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string
@@ -178,9 +178,9 @@ public class FilterImpl implements Filter {
 
    private static class FilterableServerMessage implements Filterable {
 
-      private final ServerMessage message;
+      private final Message message;
 
-      private FilterableServerMessage(ServerMessage message) {
+      private FilterableServerMessage(Message message) {
          this.message = message;
       }
 
@@ -191,7 +191,7 @@ public class FilterImpl implements Filter {
             result = getHeaderFieldValue(message, new SimpleString(id));
          }
          if (result == null) {
-            result = message.getObjectProperty(new SimpleString(id));
+            result = message.getObjectProperty(id);
          }
          if (result != null) {
             if (result.getClass() == SimpleString.class) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 09dd702..31e056c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -40,9 +42,7 @@ import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -282,7 +282,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
             return null;
          }
       });
-      ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+      CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
       for (String header : headers.keySet()) {
          message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 4b84909..5ecea64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -53,8 +53,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -609,7 +607,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       try {
          Filter singleMessageFilter = new Filter() {
             @Override
-            public boolean match(ServerMessage message) {
+            public boolean match(Message message) {
                return message.getMessageID() == messageID;
             }
 
@@ -738,7 +736,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
             return null;
          }
       });
-      ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+      CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
       for (String header : headers.keySet()) {
          message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
       }
@@ -755,7 +753,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       message.setAddress(queue.getAddress());
       ByteBuffer buffer = ByteBuffer.allocate(8);
       buffer.putLong(queue.getID());
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+      message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
       postOffice.route(message, true);
       return "" + message.getMessageID();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index ec6848b..9f36b7f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -35,7 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 
 public final class OpenTypeSupport {
 
@@ -128,6 +128,7 @@ public final class OpenTypeSupport {
 
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = new HashMap<>();
+         // TODO-now: fix this
          Message m = ref.getMessage();
          rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
          if (m.getUserID() != null) {
@@ -143,6 +144,11 @@ public final class OpenTypeSupport {
          rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
          rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
 
+         ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
+         byte[] bytes = new byte[bodyCopy.readableBytes()];
+         bodyCopy.readBytes(bytes);
+         rc.put(CompositeDataConstants.BODY, bytes);
+
          Map<String, Object> propertyMap = m.toPropertyMap();
 
          rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
@@ -264,8 +270,8 @@ public final class OpenTypeSupport {
       @Override
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = super.getFields(ref);
-         ServerMessage m = ref.getMessage();
-         ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
+         Message m = ref.getMessage();
+         ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
          byte[] bytes = new byte[bodyCopy.readableBytes()];
          bodyCopy.readBytes(bytes);
          rc.put(CompositeDataConstants.BODY, bytes);
@@ -285,8 +291,8 @@ public final class OpenTypeSupport {
       @Override
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = super.getFields(ref);
-         ServerMessage m = ref.getMessage();
-         SimpleString text = m.getBodyBuffer().copy().readNullableSimpleString();
+         Message m = ref.getMessage();
+         SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
          rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
          return rc;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
index 9b1e243..b3d8adb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
@@ -16,9 +16,10 @@
  */
 package org.apache.activemq.artemis.core.paging;
 
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 /**
  * A Paged message.
@@ -28,7 +29,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
  */
 public interface PagedMessage extends EncodingSupport {
 
-   ServerMessage getMessage();
+   Message getMessage();
 
    /**
     * The queues that were routed during paging

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 5ead1a2..a7de713 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -20,13 +20,15 @@ import java.io.File;
 import java.util.Collection;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -41,7 +43,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  *
  * @see PagingManager
  */
-public interface PagingStore extends ActiveMQComponent {
+public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
 
    SimpleString getAddress();
 
@@ -90,7 +92,7 @@ public interface PagingStore extends ActiveMQComponent {
     * needs to be sent to the journal
     * @throws NullPointerException if {@code readLock} is null
     */
-   boolean page(ServerMessage message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
+   boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
 
    Page createPage(final int page) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 768b43f..823eef4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -20,11 +20,11 @@ import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
+
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.jboss.logging.Logger;
@@ -41,7 +41,7 @@ public class PagedReferenceImpl implements PagedReference {
 
    private int persistedCount;
 
-   private int messageEstimate;
+   private int messageEstimate = -1;
 
    private Long consumerId;
 
@@ -64,7 +64,7 @@ public class PagedReferenceImpl implements PagedReference {
    }
 
    @Override
-   public ServerMessage getMessage() {
+   public Message getMessage() {
       return getPagedMessage().getMessage();
    }
 
@@ -93,12 +93,6 @@ public class PagedReferenceImpl implements PagedReference {
                              final PagedMessage message,
                              final PageSubscription subscription) {
       this.position = position;
-
-      if (message == null) {
-         this.messageEstimate = -1;
-      } else {
-         this.messageEstimate = message.getMessage().getMemoryEstimate();
-      }
       this.message = new WeakReference<>(message);
       this.subscription = subscription;
    }
@@ -120,7 +114,7 @@ public class PagedReferenceImpl implements PagedReference {
 
    @Override
    public int getMessageMemoryEstimate() {
-      if (messageEstimate < 0) {
+      if (messageEstimate <= 0) {
          try {
             messageEstimate = getMessage().getMemoryEstimate();
          } catch (Throwable e) {
@@ -139,7 +133,7 @@ public class PagedReferenceImpl implements PagedReference {
    public long getScheduledDeliveryTime() {
       if (deliveryTime == null) {
          try {
-            ServerMessage msg = getMessage();
+            Message msg = getMessage();
             if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
             } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index c40d20d..ab10eb4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
@@ -50,7 +51,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@@ -772,7 +772,7 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    // Protected -----------------------------------------------------
 
-   private boolean match(final ServerMessage message) {
+   private boolean match(final Message message) {
       if (filter == null) {
          return true;
       } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 4993d0c..aabec54 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -132,7 +132,7 @@ public final class Page implements Comparable<Page> {
                   int messageSize = fileBuffer.readInt();
                   int oldPos = fileBuffer.readerIndex();
                   if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) {
-                     PagedMessage msg = new PagedMessageImpl();
+                     PagedMessage msg = new PagedMessageImpl(storageManager);
                      msg.decode(fileBuffer);
                      byte b = fileBuffer.readByte();
                      if (b != Page.END_BYTE) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index e40d107..d50dd2e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -19,12 +19,12 @@ package org.apache.activemq.artemis.core.paging.impl;
 import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -38,39 +38,37 @@ public class PagedMessageImpl implements PagedMessage {
     */
    private byte[] largeMessageLazyData;
 
-   private ServerMessage message;
+   private Message message;
 
    private long[] queueIDs;
 
    private long transactionID = 0;
 
-   public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID) {
+   private volatile StorageManager storageManager;
+
+   public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
       this(message, queueIDs);
       this.transactionID = transactionID;
    }
 
-   public PagedMessageImpl(final ServerMessage message, final long[] queueIDs) {
+   public PagedMessageImpl(final Message message, final long[] queueIDs) {
       this.queueIDs = queueIDs;
       this.message = message;
    }
 
-   public PagedMessageImpl() {
+   public PagedMessageImpl(StorageManager storageManager) {
+      this.storageManager = storageManager;
    }
 
    @Override
-   public ServerMessage getMessage() {
+   public Message getMessage() {
       return message;
    }
 
    @Override
    public void initMessage(StorageManager storage) {
       if (largeMessageLazyData != null) {
-         LargeServerMessage lgMessage = storage.createLargeMessage();
-         ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(largeMessageLazyData);
-         lgMessage.decodeHeadersAndProperties(buffer);
-         lgMessage.incrementDelayDeletionCount();
-         lgMessage.setPaged();
-         message = lgMessage;
+         // TODO-now: use the largeMessagePersister
          largeMessageLazyData = null;
       }
    }
@@ -96,15 +94,15 @@ public class PagedMessageImpl implements PagedMessage {
       if (isLargeMessage) {
          int largeMessageHeaderSize = buffer.readInt();
 
-         largeMessageLazyData = new byte[largeMessageHeaderSize];
-
-         buffer.readBytes(largeMessageLazyData);
+         if (storageManager == null) {
+            largeMessageLazyData = new byte[largeMessageHeaderSize];
+            buffer.readBytes(largeMessageLazyData);
+         } else {
+            this.message = storageManager.createLargeMessage();
+            LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message);
+         }
       } else {
-         buffer.readInt(); // This value is only used on LargeMessages for now
-
-         message = new ServerMessageImpl(-1, 50);
-
-         message.decode(buffer);
+         this.message = MessagePersister.getInstance().decode(buffer, null);
       }
 
       int queueIDsSize = buffer.readInt();
@@ -120,11 +118,16 @@ public class PagedMessageImpl implements PagedMessage {
    public void encode(final ActiveMQBuffer buffer) {
       buffer.writeLong(transactionID);
 
-      buffer.writeBoolean(message instanceof LargeServerMessage);
+      boolean isLargeMessage = isLargeMessage();
 
-      buffer.writeInt(message.getEncodeSize());
+      buffer.writeBoolean(isLargeMessage);
 
-      message.encode(buffer);
+      if (isLargeMessage) {
+         buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message));
+         LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message);
+      } else {
+         message.getPersister().encode(buffer, message);
+      }
 
       buffer.writeInt(queueIDs.length);
 
@@ -133,10 +136,19 @@ public class PagedMessageImpl implements PagedMessage {
       }
    }
 
+   private boolean isLargeMessage() {
+      return message.isLargeMessage();
+   }
+
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
-         DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+      if (isLargeMessage()) {
+         return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) +
+            DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+      } else {
+         return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + message.getPersister().getEncodeSize(message) +
+            DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 4e57c85..e39fe40 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -54,7 +55,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -699,7 +701,6 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public void addSize(final int size) {
-
       boolean globalFull = pagingManager.addSize(size).isGlobalFull();
       long newSize = sizeInBytes.addAndGet(size);
 
@@ -747,7 +748,7 @@ public class PagingStoreImpl implements PagingStore {
    }
 
    @Override
-   public boolean page(ServerMessage message,
+   public boolean page(Message message,
                        final Transaction tx,
                        RouteContextList listCtx,
                        final ReadLock managerLock) throws Exception {
@@ -806,11 +807,12 @@ public class PagingStoreImpl implements PagingStore {
                return false;
             }
 
-            if (!message.isDurable()) {
-               // The address should never be transient when paging (even for non-persistent messages when paging)
-               // This will force everything to be persisted
-               message.forceAddress(address);
-            }
+            message.setAddress(address);
+//            if (!message.isDurable()) {
+//               // The address should never be transient when paging (even for non-persistent messages when paging)
+//               // This will force everything to be persisted
+//               message.forceAddress(address);
+//            }
 
             final long transactionID = tx == null ? -1 : tx.getID();
             PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
@@ -920,6 +922,40 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
+   @Override
+   public void durableDown(Message message, int durableCount) {
+   }
+
+   @Override
+   public void durableUp(Message message, int durableCount) {
+   }
+
+   @Override
+   public void nonDurableUp(Message message, int count) {
+      if (count == 1) {
+         this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
+      } else {
+         this.addSize(MessageReferenceImpl.getMemoryEstimate());
+      }
+   }
+
+   @Override
+   public void nonDurableDown(Message message, int count) {
+      if (count < 0) {
+         // this could happen on paged messages since they are not routed and incrementRefCount is never called
+         return;
+      }
+
+      if (count == 0) {
+         this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
+
+      } else {
+         this.addSize(-MessageReferenceImpl.getMemoryEstimate());
+      }
+
+
+   }
+
    private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
       FinishPageMessageOperation pgOper = (FinishPageMessageOperation) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
       if (pgOper == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index b45775c..e27ed30 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -23,13 +23,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -172,7 +171,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     */
    void confirmPendingLargeMessage(long recordID) throws Exception;
 
-   void storeMessage(ServerMessage message) throws Exception;
+   void storeMessage(Message message) throws Exception;
 
    void storeReference(long queueID, long messageID, boolean last) throws Exception;
 
@@ -190,7 +189,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
    void deleteDuplicateID(long recordID) throws Exception;
 
-   void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
+   void storeMessageTransactional(long txID, Message message) throws Exception;
 
    void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception;
 
@@ -225,7 +224,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     * @return a large message object
     * @throws Exception
     */
-   LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception;
+   LargeServerMessage createLargeMessage(long id, Message message) throws Exception;
 
    enum LargeMessageExtension {
       DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync");
@@ -265,11 +264,6 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
    void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
 
-   /**
-    * FIXME Unused
-    */
-   void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception;
-
    void deletePageTransactional(long recordID) throws Exception;
 
    JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
@@ -383,7 +377,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     * needs to be sent to the journal
     * @throws Exception
     */
-   boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception;
+   boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception;
 
    /**
     * Stops the replication of data from the live to the backup.


Mime
View raw message