activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [12/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Mon, 06 Mar 2017 11:53:59 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
new file mode 100644
index 0000000..5793d58
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+// TODO: Implement this
+public class OpenwireMessage implements Message {
+
+   @Override
+   public boolean containsProperty(SimpleString key) {
+      return false;
+   }
+
+   @Override
+   public void messageChanged() {
+
+   }
+
+   @Override
+   public RoutingType getRouteType() {
+      return null;
+   }
+
+   @Override
+   public SimpleString getReplyTo() {
+      return null;
+   }
+
+   @Override
+   public Message setReplyTo(SimpleString address) {
+      return null;
+   }
+
+   @Override
+   public boolean containsDeliveryAnnotationProperty(SimpleString property) {
+      return false;
+   }
+
+   @Override
+   public Object removeDeliveryAnnoationProperty(SimpleString key) {
+      return null;
+   }
+
+   @Override
+   public Object getDeliveryAnnotationProperty(SimpleString key) {
+      return null;
+   }
+
+   @Override
+   public Long getScheduledDeliveryTime() {
+      return null;
+   }
+
+   @Override
+   public RefCountMessageListener getContext() {
+      return null;
+   }
+
+   @Override
+   public Message setContext(RefCountMessageListener context) {
+      return null;
+   }
+
+   @Override
+   public Message setBuffer(ByteBuf buffer) {
+      return null;
+   }
+
+   @Override
+   public ByteBuf getBuffer() {
+      return null;
+   }
+
+   @Override
+   public Message copy() {
+      return null;
+   }
+
+   @Override
+   public Message copy(long newID) {
+      return null;
+   }
+
+   @Override
+   public long getMessageID() {
+      return 0;
+   }
+
+   @Override
+   public Message setMessageID(long id) {
+      return null;
+   }
+
+   @Override
+   public long getExpiration() {
+      return 0;
+   }
+
+   @Override
+   public Message setExpiration(long expiration) {
+      return null;
+   }
+
+   @Override
+   public Object getUserID() {
+      return null;
+   }
+
+   @Override
+   public Message setUserID(Object userID) {
+      return null;
+   }
+
+   @Override
+   public boolean isDurable() {
+      return false;
+   }
+
+   @Override
+   public Message setDurable(boolean durable) {
+      return null;
+   }
+
+   @Override
+   public Persister<Message> getPersister() {
+      return null;
+   }
+
+   @Override
+   public String getAddress() {
+      return null;
+   }
+
+   @Override
+   public Message setAddress(String address) {
+      return null;
+   }
+
+   @Override
+   public SimpleString getAddressSimpleString() {
+      return null;
+   }
+
+   @Override
+   public Message setAddress(SimpleString address) {
+      return null;
+   }
+
+   @Override
+   public long getTimestamp() {
+      return 0;
+   }
+
+   @Override
+   public Message setTimestamp(long timestamp) {
+      return null;
+   }
+
+   @Override
+   public byte getPriority() {
+      return 0;
+   }
+
+   @Override
+   public Message setPriority(byte priority) {
+      return null;
+   }
+
+   @Override
+   public void receiveBuffer(ByteBuf buffer) {
+
+   }
+
+   @Override
+   public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+
+   }
+
+   @Override
+   public int getPersistSize() {
+      return 0;
+   }
+
+   @Override
+   public void persist(ActiveMQBuffer targetRecord) {
+
+   }
+
+   @Override
+   public void reloadPersistence(ActiveMQBuffer record) {
+
+   }
+
+   @Override
+   public Message putBooleanProperty(String key, boolean value) {
+      return null;
+   }
+
+   @Override
+   public Message putByteProperty(String key, byte value) {
+      return null;
+   }
+
+   @Override
+   public Message putBytesProperty(String key, byte[] value) {
+      return null;
+   }
+
+   @Override
+   public Message putShortProperty(String key, short value) {
+      return null;
+   }
+
+   @Override
+   public Message putCharProperty(String key, char value) {
+      return null;
+   }
+
+   @Override
+   public Message putIntProperty(String key, int value) {
+      return null;
+   }
+
+   @Override
+   public Message putLongProperty(String key, long value) {
+      return null;
+   }
+
+   @Override
+   public Message putFloatProperty(String key, float value) {
+      return null;
+   }
+
+   @Override
+   public Message putDoubleProperty(String key, double value) {
+      return null;
+   }
+
+   @Override
+   public Message putBooleanProperty(SimpleString key, boolean value) {
+      return null;
+   }
+
+   @Override
+   public Message putByteProperty(SimpleString key, byte value) {
+      return null;
+   }
+
+   @Override
+   public Message putBytesProperty(SimpleString key, byte[] value) {
+      return null;
+   }
+
+   @Override
+   public Message putShortProperty(SimpleString key, short value) {
+      return null;
+   }
+
+   @Override
+   public Message putCharProperty(SimpleString key, char value) {
+      return null;
+   }
+
+   @Override
+   public Message putIntProperty(SimpleString key, int value) {
+      return null;
+   }
+
+   @Override
+   public Message putLongProperty(SimpleString key, long value) {
+      return null;
+   }
+
+   @Override
+   public Message putFloatProperty(SimpleString key, float value) {
+      return null;
+   }
+
+   @Override
+   public Message putDoubleProperty(SimpleString key, double value) {
+      return null;
+   }
+
+   @Override
+   public Message putStringProperty(String key, String value) {
+      return null;
+   }
+
+   @Override
+   public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Object removeProperty(String key) {
+      return null;
+   }
+
+   @Override
+   public boolean containsProperty(String key) {
+      return false;
+   }
+
+   @Override
+   public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Object getObjectProperty(String key) {
+      return null;
+   }
+
+   @Override
+   public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+      return new byte[0];
+   }
+
+   @Override
+   public Object removeProperty(SimpleString key) {
+      return null;
+   }
+
+   @Override
+   public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Object getObjectProperty(SimpleString key) {
+      return null;
+   }
+
+   @Override
+   public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return new byte[0];
+   }
+
+   @Override
+   public Message putStringProperty(SimpleString key, SimpleString value) {
+      return null;
+   }
+
+   @Override
+   public int getEncodeSize() {
+      return 0;
+   }
+
+   @Override
+   public Set<SimpleString> getPropertyNames() {
+      return null;
+   }
+
+   @Override
+   public int getRefCount() {
+      return 0;
+   }
+
+   @Override
+   public int incrementRefCount() throws Exception {
+      return 0;
+   }
+
+   @Override
+   public int decrementRefCount() throws Exception {
+      return 0;
+   }
+
+   @Override
+   public int incrementDurableRefCount() {
+      return 0;
+   }
+
+   @Override
+   public int decrementDurableRefCount() {
+      return 0;
+   }
+
+   @Override
+   public ICoreMessage toCore() {
+      return null;
+   }
+
+   @Override
+   public int getMemoryEstimate() {
+      return 0;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index f471a2a..3bdee8b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,15 +27,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -208,7 +209,7 @@ public class AMQConsumer {
 
    }
 
-   public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) {
+   public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) {
       MessageDispatch dispatch;
       try {
          if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
@@ -394,7 +395,7 @@ public class AMQConsumer {
          }
       }
 
-      public boolean checkForcedConsumer(ServerMessage message) {
+      public boolean checkForcedConsumer(Message message) {
          if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
             if (next >= 0) {
                if (timeout <= 0) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 79004ae..b5d2c86 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@@ -34,9 +35,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.reader.MessageUtil;
@@ -231,16 +230,17 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public int sendMessage(MessageReference reference,
-                          ServerMessage message,
+                          org.apache.activemq.artemis.api.core.Message message,
                           ServerConsumer consumer,
                           int deliveryCount) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
-      return theConsumer.handleDeliver(reference, message, deliveryCount);
+      // TODO: use encoders and proper conversions here
+      return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
    }
 
    @Override
    public int sendLargeMessage(MessageReference reference,
-                               ServerMessage message,
+                               org.apache.activemq.artemis.api.core.Message message,
                                ServerConsumer consumerID,
                                long bodySize,
                                int deliveryCount) {
@@ -296,7 +296,7 @@ public class AMQSession implements SessionCallback {
          actualDestinations = new ActiveMQDestination[]{destination};
       }
 
-      ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
+      org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
 
       if (connection.isNoLocal()) {
          //Note: advisory messages are dealt with in
@@ -324,7 +324,7 @@ public class AMQSession implements SessionCallback {
       for (int i = 0; i < actualDestinations.length; i++) {
          ActiveMQDestination dest = actualDestinations[i];
          SimpleString address = new SimpleString(dest.getPhysicalName());
-         ServerMessage coreMsg = originalCoreMsg.copy();
+         org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
          coreMsg.setAddress(address);
 
          if (actualDestinations[i].isQueue()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 5355c63..c84776b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,8 +18,9 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -53,9 +54,14 @@ public class OpenWireUtil {
     * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
     * consumer
     */
-   public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
-      String address = message.getAddress().toString();
+   public static ActiveMQDestination toAMQAddress(Message message, ActiveMQDestination actualDestination) {
+      String address = message.getAddress();
       String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
+
+      if (address == null) {
+         return actualDestination;
+      }
+
       if (actualDestination.isQueue()) {
          return new ActiveMQQueue(strippedAddress);
       } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index 861c524..d377abd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp;
 
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.jboss.logging.Messages;
 import org.jboss.logging.annotations.Cause;
 import org.jboss.logging.annotations.Message;
@@ -71,7 +70,7 @@ public interface ActiveMQStompProtocolMessageBundle {
    ActiveMQStompException invalidConnection();
 
    @Message(id = 339011, value = "Error sending message {0}", format = Message.Format.MESSAGE_FORMAT)
-   ActiveMQStompException errorSendMessage(ServerMessageImpl message, @Cause Exception e);
+   ActiveMQStompException errorSendMessage(org.apache.activemq.artemis.api.core.Message message, @Cause Exception e);
 
    @Message(id = 339012, value = "Error beginning a transaction {0}", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQStompException errorBeginTx(String txID, @Cause Exception e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index c004a0e..56067f1 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -30,18 +30,18 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -569,7 +569,7 @@ public final class StompConnection implements RemotingConnection {
       return valid;
    }
 
-   public ServerMessageImpl createServerMessage() {
+   public CoreMessage createServerMessage() {
       return manager.createServerMessage();
    }
 
@@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   protected void sendServerMessage(ServerMessageImpl message, String txID) throws ActiveMQStompException {
+   protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException {
       StompSession stompSession = getSession(txID);
 
       if (stompSession.isNoLocal()) {
@@ -611,7 +611,7 @@ public final class StompConnection implements RemotingConnection {
          if (minLargeMessageSize == -1 || (message.getBodyBuffer().writerIndex() < minLargeMessageSize)) {
             stompSession.sendInternal(message, false);
          } else {
-            stompSession.sendInternalLarge(message, false);
+            stompSession.sendInternalLarge((CoreMessage)message, false);
          }
       } catch (Exception e) {
          throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler);
@@ -726,10 +726,11 @@ public final class StompConnection implements RemotingConnection {
       return SERVER_NAME;
    }
 
-   public StompFrame createStompMessage(ServerMessage serverMessage,
+   public StompFrame createStompMessage(ICoreMessage serverMessage,
+                                        ActiveMQBuffer bodyBuffer,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
-      return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
+      return frameHandler.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount);
    }
 
    public void addStompEventListener(FrameEventListener listener) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 54339a4..39d2fe9 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -33,15 +33,14 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -109,13 +108,6 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
    }
 
    @Override
-   public MessageConverter getConverter() {
-      return null;
-   }
-
-   // ProtocolManager implementation --------------------------------
-
-   @Override
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
       StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory());
 
@@ -345,8 +337,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
       return validated;
    }
 
-   public ServerMessageImpl createServerMessage() {
-      return new ServerMessageImpl(server.getStorageManager().generateID(), 512);
+   public CoreMessage createServerMessage() {
+      return new CoreMessage(server.getStorageManager().generateID(), 512);
    }
 
    public void commitTransaction(StompConnection connection, String txID) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 1e103e9..797a966 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,23 +25,24 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.zip.Inflater;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -127,32 +128,35 @@ public class StompSession implements SessionCallback {
 
    @Override
    public int sendMessage(MessageReference ref,
-                          ServerMessage serverMessage,
+                          Message serverMessage,
                           final ServerConsumer consumer,
                           int deliveryCount) {
+
+      ICoreMessage  coreMessage = serverMessage.toCore();
+
       LargeServerMessageImpl largeMessage = null;
-      ServerMessage newServerMessage = serverMessage;
+      ICoreMessage newServerMessage = serverMessage.toCore();
       try {
          StompSubscription subscription = subscriptions.get(consumer.getID());
-         StompFrame frame = null;
-         if (serverMessage.isLargeMessage()) {
-            newServerMessage = serverMessage.copy();
+         StompFrame frame;
+         ActiveMQBuffer buffer;
 
-            largeMessage = (LargeServerMessageImpl) serverMessage;
-            BodyEncoder encoder = largeMessage.getBodyEncoder();
+         if (coreMessage.isLargeMessage()) {
+            LargeBodyEncoder encoder = coreMessage.getBodyEncoder();
             encoder.open();
             int bodySize = (int) encoder.getLargeBodySize();
 
-            //large message doesn't have a body.
-            ((ServerMessageImpl) newServerMessage).createBody(bodySize);
-            encoder.encode(newServerMessage.getBodyBuffer(), bodySize);
+            buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
+
+            encoder.encode(buffer, bodySize);
             encoder.close();
+         } else {
+            buffer = coreMessage.getReadOnlyBodyBuffer();
          }
 
          if (serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
-            //decompress
-            ActiveMQBuffer qbuff = newServerMessage.getBodyBuffer();
-            int bytesToRead = qbuff.writerIndex() - MessageImpl.BODY_OFFSET;
+            ActiveMQBuffer qbuff = buffer;
+            int bytesToRead = qbuff.readerIndex();
             Inflater inflater = new Inflater();
             inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
 
@@ -165,9 +169,10 @@ public class StompSession implements SessionCallback {
             qbuff.resetReaderIndex();
             qbuff.resetWriterIndex();
             qbuff.writeBytes(data);
+            buffer = qbuff;
          }
 
-         frame = connection.createStompMessage(newServerMessage, subscription, deliveryCount);
+         frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount);
 
          int length = frame.getEncodedSize();
 
@@ -219,7 +224,7 @@ public class StompSession implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference ref,
-                               ServerMessage msg,
+                               Message msg,
                                ServerConsumer consumer,
                                long bodySize,
                                int deliveryCount) {
@@ -370,11 +375,11 @@ public class StompSession implements SessionCallback {
       this.noLocal = noLocal;
    }
 
-   public void sendInternal(ServerMessageImpl message, boolean direct) throws Exception {
+   public void sendInternal(Message message, boolean direct) throws Exception {
       session.send(message, direct);
    }
 
-   public void sendInternalLarge(ServerMessageImpl message, boolean direct) throws Exception {
+   public void sendInternalLarge(CoreMessage message, boolean direct) throws Exception {
       int headerSize = message.getHeadersAndPropertiesEncodeSize();
       if (headerSize >= connection.getMinLargeMessageSize()) {
          throw BUNDLE.headerTooBig();
@@ -384,7 +389,7 @@ public class StompSession implements SessionCallback {
       long id = storageManager.generateID();
       LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);
 
-      byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - MessageImpl.BODY_OFFSET];
+      byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
       message.getBodyBuffer().readBytes(bytes);
 
       largeMessage.addBytes(bytes);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index affab84..7db9d82 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -24,8 +24,6 @@ import java.util.Set;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.reader.MessageUtil;
 
 public class StompUtils {
@@ -37,7 +35,7 @@ public class StompUtils {
 
    // Static --------------------------------------------------------
 
-   public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception {
+   public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception {
       Map<String, String> headers = new HashMap<>(frame.getHeadersMap());
 
       String priority = headers.remove(Stomp.Headers.Send.PRIORITY);
@@ -79,7 +77,7 @@ public class StompUtils {
       }
    }
 
-   public static void copyStandardHeadersFromMessageToFrame(MessageInternal message,
+   public static void copyStandardHeadersFromMessageToFrame(Message message,
                                                             StompFrame command,
                                                             int deliveryCount) throws Exception {
       command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index f91ba82..3f68c6f 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -20,17 +20,15 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
 import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -180,7 +178,7 @@ public abstract class VersionedStompFrameHandler {
 
          long timestamp = System.currentTimeMillis();
 
-         ServerMessageImpl message = connection.createServerMessage();
+         CoreMessage message = connection.createServerMessage();
          if (routingType != null) {
             message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
          }
@@ -289,7 +287,8 @@ public abstract class VersionedStompFrameHandler {
       return response;
    }
 
-   public StompFrame createMessageFrame(ServerMessage serverMessage,
+   public StompFrame createMessageFrame(ICoreMessage serverMessage,
+                                        ActiveMQBuffer bodyBuffer,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
@@ -298,13 +297,11 @@ public abstract class VersionedStompFrameHandler {
          frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
       }
 
-      ActiveMQBuffer buffer = serverMessage.getBodyBufferDuplicate();
-
-      int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+      ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();
 
-      buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+      int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();
 
-      int size = bodyPos - buffer.readerIndex();
+      int size = buffer.writerIndex();
 
       byte[] data = new byte[size];
 
@@ -321,7 +318,7 @@ public abstract class VersionedStompFrameHandler {
       }
       frame.setByteBody(data);
 
-      StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+      StompUtils.copyStandardHeadersFromMessageToFrame((serverMessage), frame, deliveryCount);
 
       return frame;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 6b211d2..77a9225 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@@ -27,7 +29,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompSubscription;
 import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -48,10 +49,11 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
    }
 
    @Override
-   public StompFrame createMessageFrame(ServerMessage serverMessage,
+   public StompFrame createMessageFrame(ICoreMessage serverMessage,
+                                        ActiveMQBuffer bodyBuffer,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
-      StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount);
+      StompFrame frame = super.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount);
 
       if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
          frame.addHeader(Stomp.Headers.Message.ACK, String.valueOf(serverMessage.getMessageID()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7881470..30d6668 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -750,10 +750,6 @@ public interface Configuration {
 
    Configuration setLogJournalWriteRate(boolean rate);
 
-   int getJournalPerfBlastPages();
-
-   Configuration setJournalPerfBlastPages(int pages);
-
    long getServerDumpInterval();
 
    Configuration setServerDumpInterval(long interval);
@@ -766,10 +762,6 @@ public interface Configuration {
 
    Configuration setMemoryMeasureInterval(long memoryMeasureInterval);
 
-   boolean isRunSyncSpeedTest();
-
-   Configuration setRunSyncSpeedTest(boolean run);
-
    // Paging Properties --------------------------------------------------------------------
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index f4eda91..329f654 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -193,10 +193,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    protected boolean logJournalWriteRate = ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate();
 
-   protected int journalPerfBlastPages = ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages();
-
-   protected boolean runSyncSpeedTest = ActiveMQDefaultConfiguration.isDefaultRunSyncSpeedTest();
-
    private WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
 
    private boolean messageCounterEnabled = ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled();
@@ -854,28 +850,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
-   public int getJournalPerfBlastPages() {
-      return journalPerfBlastPages;
-   }
-
-   @Override
-   public ConfigurationImpl setJournalPerfBlastPages(final int journalPerfBlastPages) {
-      this.journalPerfBlastPages = journalPerfBlastPages;
-      return this;
-   }
-
-   @Override
-   public boolean isRunSyncSpeedTest() {
-      return runSyncSpeedTest;
-   }
-
-   @Override
-   public ConfigurationImpl setRunSyncSpeedTest(final boolean run) {
-      runSyncSpeedTest = run;
-      return this;
-   }
-
-   @Override
    public boolean isCreateBindingsDir() {
       return createBindingsDir;
    }
@@ -1556,7 +1530,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
       result = prime * result + journalMaxIO_AIO;
       result = prime * result + journalMaxIO_NIO;
       result = prime * result + journalMinFiles;
-      result = prime * result + journalPerfBlastPages;
       result = prime * result + (journalSyncNonTransactional ? 1231 : 1237);
       result = prime * result + (journalSyncTransactional ? 1231 : 1237);
       result = prime * result + ((journalType == null) ? 0 : journalType.hashCode());
@@ -1580,7 +1553,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
       result = prime * result + (persistIDCache ? 1231 : 1237);
       result = prime * result + (persistenceEnabled ? 1231 : 1237);
       result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode());
-      result = prime * result + (runSyncSpeedTest ? 1231 : 1237);
       result = prime * result + scheduledThreadPoolMaxSize;
       result = prime * result + (securityEnabled ? 1231 : 1237);
       result = prime * result + (populateValidatedUser ? 1231 : 1237);
@@ -1723,8 +1695,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
          return false;
       if (journalMinFiles != other.journalMinFiles)
          return false;
-      if (journalPerfBlastPages != other.journalPerfBlastPages)
-         return false;
       if (journalSyncNonTransactional != other.journalSyncNonTransactional)
          return false;
       if (journalSyncTransactional != other.journalSyncTransactional)
@@ -1793,8 +1763,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
             return false;
       } else if (!queueConfigurations.equals(other.queueConfigurations))
          return false;
-      if (runSyncSpeedTest != other.runSyncSpeedTest)
-         return false;
       if (scheduledThreadPoolMaxSize != other.scheduledThreadPoolMaxSize)
          return false;
       if (securityEnabled != other.securityEnabled)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index cea0598..4055b5c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -548,10 +548,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       config.setLogJournalWriteRate(getBoolean(e, "log-journal-write-rate", ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate()));
 
-      config.setJournalPerfBlastPages(getInteger(e, "perf-blast-pages", ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), Validators.MINUS_ONE_OR_GT_ZERO));
-
-      config.setRunSyncSpeedTest(getBoolean(e, "run-sync-speed-test", config.isRunSyncSpeedTest()));
-
       if (e.hasAttribute("wild-card-routing-enabled")) {
          config.setWildcardRoutingEnabled(getBoolean(e, "wild-card-routing-enabled", config.isWildcardRoutingEnabled()));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
index 41d5e54..3737e19 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.artemis.core.filter;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public interface Filter {
 
@@ -31,7 +31,7 @@ public interface Filter {
     */
    String GENERIC_IGNORED_FILTER = "__AMQX=-1";
 
-   boolean match(ServerMessage message);
+   boolean match(Message message);
 
    SimpleString getFilterString();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index 0a459c9..33a1187 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.filter.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.selector.filter.BooleanExpression;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.filter.Filterable;
@@ -103,7 +103,7 @@ public class FilterImpl implements Filter {
    }
 
    @Override
-   public synchronized boolean match(final ServerMessage message) {
+   public synchronized boolean match(final Message message) {
       try {
          boolean result = booleanExpression.matches(new FilterableServerMessage(message));
          return result;
@@ -148,7 +148,7 @@ public class FilterImpl implements Filter {
 
    // Private --------------------------------------------------------------------------
 
-   private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) {
+   private static Object getHeaderFieldValue(final Message msg, final SimpleString fieldName) {
       if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) {
          if (msg.getUserID() == null) {
             // Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string
@@ -158,7 +158,12 @@ public class FilterImpl implements Filter {
             }
          }
          // It's the stringified (hex) representation of a user id that can be used in a selector expression
-         return new SimpleString("ID:" + msg.getUserID());
+         String userID = msg.getUserID().toString();
+         if (userID.startsWith("ID:")) {
+            return SimpleString.toSimpleString(userID);
+         } else {
+            return new SimpleString("ID:" + msg.getUserID());
+         }
       } else if (FilterConstants.ACTIVEMQ_PRIORITY.equals(fieldName)) {
          return Integer.valueOf(msg.getPriority());
       } else if (FilterConstants.ACTIVEMQ_TIMESTAMP.equals(fieldName)) {
@@ -178,9 +183,9 @@ public class FilterImpl implements Filter {
 
    private static class FilterableServerMessage implements Filterable {
 
-      private final ServerMessage message;
+      private final Message message;
 
-      private FilterableServerMessage(ServerMessage message) {
+      private FilterableServerMessage(Message message) {
          this.message = message;
       }
 
@@ -191,7 +196,7 @@ public class FilterImpl implements Filter {
             result = getHeaderFieldValue(message, new SimpleString(id));
          }
          if (result == null) {
-            result = message.getObjectProperty(new SimpleString(id));
+            result = message.getObjectProperty(id);
          }
          if (result != null) {
             if (result.getClass() == SimpleString.class) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 09dd702..31e056c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -40,9 +42,7 @@ import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -282,7 +282,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
             return null;
          }
       });
-      ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+      CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
       for (String header : headers.keySet()) {
          message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 4b84909..5ecea64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -53,8 +53,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -609,7 +607,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       try {
          Filter singleMessageFilter = new Filter() {
             @Override
-            public boolean match(ServerMessage message) {
+            public boolean match(Message message) {
                return message.getMessageID() == messageID;
             }
 
@@ -738,7 +736,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
             return null;
          }
       });
-      ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+      CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
       for (String header : headers.keySet()) {
          message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
       }
@@ -755,7 +753,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       message.setAddress(queue.getAddress());
       ByteBuffer buffer = ByteBuffer.allocate(8);
       buffer.putLong(queue.getID());
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+      message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
       postOffice.route(message, true);
       return "" + message.getMessageID();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index ec6848b..098c61c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -32,10 +32,10 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public final class OpenTypeSupport {
 
@@ -48,8 +48,10 @@ public final class OpenTypeSupport {
    public static CompositeData convert(MessageReference ref) throws OpenDataException {
       CompositeType ct;
 
+      ICoreMessage message = ref.getMessage().toCore();
+
       Map<String, Object> fields;
-      byte type = ref.getMessage().getType();
+      byte type = message.getType();
 
       switch(type) {
          case Message.TEXT_TYPE:
@@ -128,7 +130,7 @@ public final class OpenTypeSupport {
 
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = new HashMap<>();
-         Message m = ref.getMessage();
+         ICoreMessage m = ref.getMessage().toCore();
          rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
          if (m.getUserID() != null) {
             rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString());
@@ -143,6 +145,11 @@ public final class OpenTypeSupport {
          rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
          rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
 
+         ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
+         byte[] bytes = new byte[bodyCopy.readableBytes()];
+         bodyCopy.readBytes(bytes);
+         rc.put(CompositeDataConstants.BODY, bytes);
+
          Map<String, Object> propertyMap = m.toPropertyMap();
 
          rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
@@ -264,8 +271,8 @@ public final class OpenTypeSupport {
       @Override
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = super.getFields(ref);
-         ServerMessage m = ref.getMessage();
-         ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
+         ICoreMessage m = ref.getMessage().toCore();
+         ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
          byte[] bytes = new byte[bodyCopy.readableBytes()];
          bodyCopy.readBytes(bytes);
          rc.put(CompositeDataConstants.BODY, bytes);
@@ -285,8 +292,8 @@ public final class OpenTypeSupport {
       @Override
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = super.getFields(ref);
-         ServerMessage m = ref.getMessage();
-         SimpleString text = m.getBodyBuffer().copy().readNullableSimpleString();
+         ICoreMessage m = ref.getMessage().toCore();
+         SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
          rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
          return rc;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
index 9b1e243..0124f09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.artemis.core.paging;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 
 /**
  * A Paged message.
@@ -28,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
  */
 public interface PagedMessage extends EncodingSupport {
 
-   ServerMessage getMessage();
+   Message getMessage();
 
    /**
     * The queues that were routed during paging

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 5ead1a2..2d4c646 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -20,13 +20,14 @@ import java.io.File;
 import java.util.Collection;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -41,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  *
  * @see PagingManager
  */
-public interface PagingStore extends ActiveMQComponent {
+public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
 
    SimpleString getAddress();
 
@@ -90,7 +91,7 @@ public interface PagingStore extends ActiveMQComponent {
     * needs to be sent to the journal
     * @throws NullPointerException if {@code readLock} is null
     */
-   boolean page(ServerMessage message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
+   boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
 
    Page createPage(final int page) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 768b43f..823eef4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -20,11 +20,11 @@ import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
+
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.jboss.logging.Logger;
@@ -41,7 +41,7 @@ public class PagedReferenceImpl implements PagedReference {
 
    private int persistedCount;
 
-   private int messageEstimate;
+   private int messageEstimate = -1;
 
    private Long consumerId;
 
@@ -64,7 +64,7 @@ public class PagedReferenceImpl implements PagedReference {
    }
 
    @Override
-   public ServerMessage getMessage() {
+   public Message getMessage() {
       return getPagedMessage().getMessage();
    }
 
@@ -93,12 +93,6 @@ public class PagedReferenceImpl implements PagedReference {
                              final PagedMessage message,
                              final PageSubscription subscription) {
       this.position = position;
-
-      if (message == null) {
-         this.messageEstimate = -1;
-      } else {
-         this.messageEstimate = message.getMessage().getMemoryEstimate();
-      }
       this.message = new WeakReference<>(message);
       this.subscription = subscription;
    }
@@ -120,7 +114,7 @@ public class PagedReferenceImpl implements PagedReference {
 
    @Override
    public int getMessageMemoryEstimate() {
-      if (messageEstimate < 0) {
+      if (messageEstimate <= 0) {
          try {
             messageEstimate = getMessage().getMemoryEstimate();
          } catch (Throwable e) {
@@ -139,7 +133,7 @@ public class PagedReferenceImpl implements PagedReference {
    public long getScheduledDeliveryTime() {
       if (deliveryTime == null) {
          try {
-            ServerMessage msg = getMessage();
+            Message msg = getMessage();
             if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
             } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index c40d20d..ab10eb4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
@@ -50,7 +51,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@@ -772,7 +772,7 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    // Protected -----------------------------------------------------
 
-   private boolean match(final ServerMessage message) {
+   private boolean match(final Message message) {
       if (filter == null) {
          return true;
       } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 4993d0c..7d21316 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -132,7 +133,7 @@ public final class Page implements Comparable<Page> {
                   int messageSize = fileBuffer.readInt();
                   int oldPos = fileBuffer.readerIndex();
                   if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) {
-                     PagedMessage msg = new PagedMessageImpl();
+                     PagedMessage msg = new PagedMessageImpl(storageManager);
                      msg.decode(fileBuffer);
                      byte b = fileBuffer.readByte();
                      if (b != Page.END_BYTE) {
@@ -255,7 +256,7 @@ public final class Page implements Comparable<Page> {
 
       if (messages != null) {
          for (PagedMessage msg : messages) {
-            if (msg.getMessage().isLargeMessage()) {
+            if (msg.getMessage() instanceof ICoreMessage &&  (msg.getMessage()).isLargeMessage()) {
                LargeServerMessage lmsg = (LargeServerMessage) msg.getMessage();
 
                // Remember, cannot call delete directly here

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index e40d107..b770623 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -20,11 +20,13 @@ import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -38,27 +40,30 @@ public class PagedMessageImpl implements PagedMessage {
     */
    private byte[] largeMessageLazyData;
 
-   private ServerMessage message;
+   private Message message;
 
    private long[] queueIDs;
 
    private long transactionID = 0;
 
-   public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID) {
+   private volatile StorageManager storageManager;
+
+   public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
       this(message, queueIDs);
       this.transactionID = transactionID;
    }
 
-   public PagedMessageImpl(final ServerMessage message, final long[] queueIDs) {
+   public PagedMessageImpl(final Message message, final long[] queueIDs) {
       this.queueIDs = queueIDs;
       this.message = message;
    }
 
-   public PagedMessageImpl() {
+   public PagedMessageImpl(StorageManager storageManager) {
+      this.storageManager = storageManager;
    }
 
    @Override
-   public ServerMessage getMessage() {
+   public Message getMessage() {
       return message;
    }
 
@@ -66,11 +71,11 @@ public class PagedMessageImpl implements PagedMessage {
    public void initMessage(StorageManager storage) {
       if (largeMessageLazyData != null) {
          LargeServerMessage lgMessage = storage.createLargeMessage();
-         ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(largeMessageLazyData);
-         lgMessage.decodeHeadersAndProperties(buffer);
-         lgMessage.incrementDelayDeletionCount();
-         lgMessage.setPaged();
-         message = lgMessage;
+
+         ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData);
+         LargeMessagePersister.getInstance().decode(buffer, lgMessage);
+         ((LargeServerMessage) message).incrementDelayDeletionCount();
+         this.message = lgMessage;
          largeMessageLazyData = null;
       }
    }
@@ -96,15 +101,16 @@ public class PagedMessageImpl implements PagedMessage {
       if (isLargeMessage) {
          int largeMessageHeaderSize = buffer.readInt();
 
-         largeMessageLazyData = new byte[largeMessageHeaderSize];
-
-         buffer.readBytes(largeMessageLazyData);
+         if (storageManager == null) {
+            largeMessageLazyData = new byte[largeMessageHeaderSize];
+            buffer.readBytes(largeMessageLazyData);
+         } else {
+            this.message = storageManager.createLargeMessage();
+            LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message);
+            ((LargeServerMessage) message).incrementDelayDeletionCount();
+         }
       } else {
-         buffer.readInt(); // This value is only used on LargeMessages for now
-
-         message = new ServerMessageImpl(-1, 50);
-
-         message.decode(buffer);
+         this.message = MessagePersister.getInstance().decode(buffer, null);
       }
 
       int queueIDsSize = buffer.readInt();
@@ -120,11 +126,16 @@ public class PagedMessageImpl implements PagedMessage {
    public void encode(final ActiveMQBuffer buffer) {
       buffer.writeLong(transactionID);
 
-      buffer.writeBoolean(message instanceof LargeServerMessage);
+      boolean isLargeMessage = isLargeMessage();
 
-      buffer.writeInt(message.getEncodeSize());
+      buffer.writeBoolean(isLargeMessage);
 
-      message.encode(buffer);
+      if (isLargeMessage) {
+         buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message));
+         LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message);
+      } else {
+         message.getPersister().encode(buffer, message);
+      }
 
       buffer.writeInt(queueIDs.length);
 
@@ -133,10 +144,19 @@ public class PagedMessageImpl implements PagedMessage {
       }
    }
 
+   public boolean isLargeMessage() {
+      return message instanceof ICoreMessage && ((ICoreMessage)message).isLargeMessage();
+   }
+
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
-         DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+      if (isLargeMessage()) {
+         return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) +
+            DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+      } else {
+         return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + message.getPersister().getEncodeSize(message) +
+            DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 4e57c85..a8e2190 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -54,7 +55,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -699,7 +700,6 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public void addSize(final int size) {
-
       boolean globalFull = pagingManager.addSize(size).isGlobalFull();
       long newSize = sizeInBytes.addAndGet(size);
 
@@ -747,7 +747,7 @@ public class PagingStoreImpl implements PagingStore {
    }
 
    @Override
-   public boolean page(ServerMessage message,
+   public boolean page(Message message,
                        final Transaction tx,
                        RouteContextList listCtx,
                        final ReadLock managerLock) throws Exception {
@@ -806,11 +806,7 @@ public class PagingStoreImpl implements PagingStore {
                return false;
             }
 
-            if (!message.isDurable()) {
-               // The address should never be transient when paging (even for non-persistent messages when paging)
-               // This will force everything to be persisted
-               message.forceAddress(address);
-            }
+            message.setAddress(address);
 
             final long transactionID = tx == null ? -1 : tx.getID();
             PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
@@ -920,6 +916,40 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
+   @Override
+   public void durableDown(Message message, int durableCount) {
+   }
+
+   @Override
+   public void durableUp(Message message, int durableCount) {
+   }
+
+   @Override
+   public void nonDurableUp(Message message, int count) {
+      if (count == 1) {
+         this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
+      } else {
+         this.addSize(MessageReferenceImpl.getMemoryEstimate());
+      }
+   }
+
+   @Override
+   public void nonDurableDown(Message message, int count) {
+      if (count < 0) {
+         // this could happen on paged messages since they are not routed and incrementRefCount is never called
+         return;
+      }
+
+      if (count == 0) {
+         this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
+
+      } else {
+         this.addSize(-MessageReferenceImpl.getMemoryEstimate());
+      }
+
+
+   }
+
    private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
       FinishPageMessageOperation pgOper = (FinishPageMessageOperation) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
       if (pgOper == null) {


Mime
View raw message