activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [14/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Sun, 05 Mar 2017 16:50:15 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
new file mode 100644
index 0000000..1e4087f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -0,0 +1,1091 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.impl;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.encode.BodyType;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUID;
+import org.jboss.logging.Logger;
+
+/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
+ *  consumers */
+public class CoreMessage extends RefCountMessage implements ICoreMessage {
+
+   public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
+
+   private volatile int memoryEstimate = -1;
+
+   private static final Logger logger = Logger.getLogger(CoreMessage.class);
+
+   // There's an integer with the number of bytes for the body
+   public static final int BODY_OFFSET = DataConstants.SIZE_INT;
+
+   /** That is the encode for the whole message, including properties..
+       it does not include the buffer for the Packet send and receive header on core protocol */
+   protected ByteBuf buffer;
+
+   private volatile boolean validBuffer = false;
+
+   protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer;
+
+   Object body;
+
+   protected int endOfBodyPosition = -1;
+
+   protected int messageIDPosition = -1;
+
+   protected long messageID;
+
+   protected SimpleString address;
+
+   protected byte type;
+
+   protected boolean durable;
+
+   /**
+    * GMT milliseconds at which this message expires. 0 means never expires *
+    */
+   private long expiration;
+
+   protected long timestamp;
+
+   protected byte priority;
+
+   private UUID userID;
+
+   private int propertiesLocation = -1;
+
+   protected volatile TypedProperties properties;
+
+   public CoreMessage() {
+   }
+
+   /** On core there's no delivery annotation */
+   @Override
+   public Object getDeliveryAnnotationProperty(SimpleString key) {
+      return getObjectProperty(key);
+   }
+
+   /** On core there's no delivery annotation */
+   @Override
+   public Object removeDeliveryAnnoationProperty(SimpleString key) {
+      return removeProperty(key);
+   }
+
+   @Override
+   public void cleanupInternalProperties() {
+      if (properties.hasInternalProperties()) {
+         LinkedList<SimpleString> valuesToRemove = null;
+
+         for (SimpleString name : getPropertyNames()) {
+            // We use properties to establish routing context on clustering.
+            // However if the client resends the message after receiving, it needs to be removed
+            if ((name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS))) {
+               if (valuesToRemove == null) {
+                  valuesToRemove = new LinkedList<>();
+               }
+               valuesToRemove.add(name);
+            }
+         }
+
+         if (valuesToRemove != null) {
+            for (SimpleString removal : valuesToRemove) {
+               this.removeProperty(removal);
+            }
+         }
+      }
+   }
+
+   @Override
+   public boolean containsDeliveryAnnotationProperty(SimpleString property) {
+      checkProperties();
+      return properties.containsProperty(property);
+   }
+
+   @Override
+   public Persister<Message> getPersister() {
+      return CoreMessagePersister.getInstance();
+   }
+
+   public CoreMessage initBuffer(final int initialMessageBufferSize) {
+      buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize).byteBuf();
+
+      // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+      buffer.writeByte((byte) 0);
+
+      buffer.setIndex(BODY_OFFSET, BODY_OFFSET);
+
+      return this;
+   }
+
+   @Override
+   public SimpleString getReplyTo() {
+      return getSimpleStringProperty(MessageUtil.REPLYTO_HEADER_NAME);
+   }
+
+   @Override
+   public RoutingType getRouteType() {
+      if (containsProperty(Message.HDR_ROUTING_TYPE)) {
+         return RoutingType.getType(getByteProperty(Message.HDR_ROUTING_TYPE));
+      }
+      return null;
+   }
+
+   @Override
+   public CoreMessage setReplyTo(SimpleString address) {
+
+      if (address == null) {
+         checkProperties();
+         properties.removeProperty(MessageUtil.REPLYTO_HEADER_NAME);
+      } else {
+         putStringProperty(MessageUtil.REPLYTO_HEADER_NAME, address);
+      }
+      return this;
+   }
+
+   @Override
+   public void receiveBuffer(ByteBuf buffer) {
+      this.buffer = buffer;
+      this.buffer.retain();
+      decode();
+      this.validBuffer = true;
+   }
+
+   @Override
+   public ActiveMQBuffer getReadOnlyBodyBuffer() {
+      checkEncode();
+      internalWritableBuffer();
+      return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
+   }
+
+   /**
+    *
+    * @param sendBuffer
+    * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
+    */
+   @Override
+   public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
+      checkEncode();
+      sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+   }
+
+   private synchronized void checkEncode() {
+      if (!validBuffer) {
+         encode();
+      }
+   }
+
+   @Override
+   public Long getScheduledDeliveryTime() {
+      Object property = getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+
+      if (property != null && property instanceof Number) {
+         return ((Number) property).longValue();
+      }
+
+      return null;
+   }
+
+   @Override
+   public InputStream getBodyInputStream() {
+      return null;
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   @Override
+   public ActiveMQBuffer getBodyBuffer() {
+      // if using the writable buffer, we must parse properties
+      checkProperties();
+
+      internalWritableBuffer();
+
+      return writableBuffer;
+   }
+
+   private void internalWritableBuffer() {
+      if (writableBuffer == null) {
+         writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
+         if (endOfBodyPosition > 0) {
+            writableBuffer.byteBuf().setIndex(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE + BODY_OFFSET);
+            writableBuffer.resetReaderIndex();
+         }
+      }
+   }
+
+   @Override
+   public int getEndOfBodyPosition() {
+      if (endOfBodyPosition < 0) {
+         endOfBodyPosition = getBodyBuffer().writerIndex();
+      }
+      return endOfBodyPosition;
+   }
+
+
+   public TypedProperties getTypedProperties() {
+      return checkProperties();
+   }
+
+
+   @Override
+   public void messageChanged() {
+      validBuffer = false;
+   }
+
+   protected CoreMessage(CoreMessage other) {
+      this(other, other.properties);
+   }
+
+   public CoreMessage(long id, int bufferSize) {
+      this.initBuffer(bufferSize);
+      this.setMessageID(id);
+   }
+
+   protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
+      this.body = other.body;
+      this.endOfBodyPosition = other.endOfBodyPosition;
+      this.messageID = other.messageID;
+      this.address = other.address;
+      this.type = other.type;
+      this.durable = other.durable;
+      this.expiration = other.expiration;
+      this.timestamp = other.timestamp;
+      this.priority = other.priority;
+      this.userID = other.userID;
+      if (copyProperties != null) {
+         this.properties = new TypedProperties(copyProperties);
+      }
+      if (other.buffer != null) {
+         this.buffer = other.buffer.copy();
+      }
+   }
+
+   @Override
+   public void copyHeadersAndProperties(final Message msg) {
+      messageID = msg.getMessageID();
+      address = msg.getAddressSimpleString();
+      userID = (UUID)msg.getUserID();
+      type = msg.toCore().getType();
+      durable = msg.isDurable();
+      expiration = msg.getExpiration();
+      timestamp = msg.getTimestamp();
+      priority = msg.getPriority();
+
+      if (msg instanceof CoreMessage) {
+         properties = ((CoreMessage)msg).getTypedProperties();
+      } else {
+         // TODO-now: copy stuff
+         logger.warn("Must implement copyHeaderAndProperties for other messages");
+      }
+   }
+
+
+   @Override
+   public Message copy() {
+      checkEncode();
+      return new CoreMessage(this);
+   }
+
+   @Override
+   public Message copy(long newID) {
+      return copy().setMessageID(newID);
+   }
+
+   @Override
+   public long getExpiration() {
+      return expiration;
+   }
+
+   @Override
+   public long getTimestamp() {
+      return timestamp;
+   }
+
+   @Override
+   public CoreMessage setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageID;
+   }
+
+   @Override
+   public byte getPriority() {
+      return priority;
+   }
+
+   @Override
+   public UUID getUserID() {
+      return userID;
+   }
+
+   @Override
+   public CoreMessage setUserID(Object uuid) {
+      this.userID = (UUID)uuid;
+      return this;
+   }
+
+   @Override
+   public CoreMessage setMessageID(long messageID) {
+      this.messageID = messageID;
+      if (messageIDPosition >= 0 && validBuffer) {
+         buffer.setLong(messageIDPosition, messageID);
+      }
+      return this;
+   }
+
+   @Override
+   public CoreMessage setAddress(SimpleString address) {
+      if (validBuffer && !address.equals(this.address)) {
+         messageChanged();
+      }
+      this.address = address;
+      return this;
+   }
+
+   @Override
+   public SimpleString getAddressSimpleString() {
+      return address;
+   }
+
+
+   @Override
+   public CoreMessage setExpiration(long expiration) {
+      this.expiration = expiration;
+      messageChanged();
+      return this;
+   }
+
+   @Override
+   public CoreMessage setPriority(byte priority) {
+      this.priority = priority;
+      messageChanged();
+      return this;
+   }
+
+   public CoreMessage setUserID(UUID userID) {
+      this.userID = userID;
+      messageChanged();
+      return this;
+   }
+
+   /**
+    * I am keeping this synchronized as the decode of the Properties is lazy
+    */
+   protected TypedProperties checkProperties() {
+      if (properties == null) {
+         TypedProperties properties = new TypedProperties();
+         if (buffer != null && propertiesLocation >= 0) {
+            properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
+         }
+         this.properties = properties;
+      }
+
+      return this.properties;
+   }
+
+   @Override
+   public int getMemoryEstimate() {
+      if (memoryEstimate == -1) {
+         memoryEstimate = memoryOffset +
+            (buffer != null ? buffer.capacity() : 0) +
+            (properties != null ? properties.getMemoryOffset() : 0);
+      }
+
+      return memoryEstimate;
+   }
+
+   @Override
+   public boolean isServerMessage() {
+      // even though CoreMessage is used both on server and client
+      // callers are interested in knowing if this is a server large message
+      // as it will be used to send the body from the files.
+      //
+      // this may need further refactoring when we improve large messages
+      // and expose that functionality to other protocols.
+      return false;
+   }
+
+   @Override
+   public byte getType() {
+      return type;
+   }
+
+   @Override
+   public CoreMessage setType(byte type) {
+      this.type = type;
+      return this;
+   }
+
+   private void decode() {
+      endOfBodyPosition = buffer.readInt();
+
+      buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
+
+      decodeHeadersAndProperties(buffer, true);
+      buffer.readerIndex(0);
+
+      internalWritableBuffer();
+   }
+
+
+   public void decodeHeadersAndProperties(final ByteBuf buffer) {
+      decodeHeadersAndProperties(buffer, false);
+   }
+
+   private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
+      messageIDPosition = buffer.readerIndex();
+      messageID = buffer.readLong();
+
+      address = SimpleString.readNullableSimpleString(buffer);
+      if (buffer.readByte() == DataConstants.NOT_NULL) {
+         byte[] bytes = new byte[16];
+         buffer.readBytes(bytes);
+         userID = new UUID(UUID.TYPE_TIME_BASED, bytes);
+      } else {
+         userID = null;
+      }
+      type = buffer.readByte();
+      durable = buffer.readBoolean();
+      expiration = buffer.readLong();
+      timestamp = buffer.readLong();
+      priority = buffer.readByte();
+      if (lazyProperties) {
+         properties = null;
+         propertiesLocation = buffer.readerIndex();
+      } else {
+         properties = new TypedProperties();
+         properties.decode(buffer);
+      }
+   }
+
+
+   public synchronized CoreMessage encode() {
+
+      checkProperties();
+
+      if (writableBuffer != null) {
+         // The message encode takes into consideration the PacketImpl which is not part of this encoding
+         // so we always need to take the BUFFER_HEADER_SPACE from packet impl into consideration
+         endOfBodyPosition = writableBuffer.writerIndex() + BUFFER_HEADER_SPACE - 4;
+      } else if (endOfBodyPosition <= 0) {
+         endOfBodyPosition = BUFFER_HEADER_SPACE;
+      }
+
+      buffer.setIndex(0, 0);
+      buffer.writeInt(endOfBodyPosition);
+
+      // The end of body position
+      buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+
+      encodeHeadersAndProperties(buffer);
+
+      validBuffer = true;
+
+      return this;
+   }
+
+   public void encodeHeadersAndProperties(final ByteBuf buffer) {
+      checkProperties();
+      messageIDPosition = buffer.writerIndex();
+      buffer.writeLong(messageID);
+      SimpleString.writeNullableSimpleString(buffer, address);
+      if (userID == null) {
+         buffer.writeByte(DataConstants.NULL);
+      } else {
+         buffer.writeByte(DataConstants.NOT_NULL);
+         buffer.writeBytes(userID.asBytes());
+      }
+      buffer.writeByte(type);
+      buffer.writeBoolean(durable);
+      buffer.writeLong(expiration);
+      buffer.writeLong(timestamp);
+      buffer.writeByte(priority);
+      properties.encode(buffer);
+   }
+
+   @Override
+   public int getHeadersAndPropertiesEncodeSize() {
+      return DataConstants.SIZE_LONG + // Message ID
+         DataConstants.SIZE_BYTE + // user id null?
+         (userID == null ? 0 : 16) +
+             /* address */SimpleString.sizeofNullableString(address) +
+         DataConstants./* Type */SIZE_BYTE +
+         DataConstants./* Durable */SIZE_BOOLEAN +
+         DataConstants./* Expiration */SIZE_LONG +
+         DataConstants./* Timestamp */SIZE_LONG +
+         DataConstants./* Priority */SIZE_BYTE +
+             /* PropertySize and Properties */checkProperties().getEncodeSize();
+   }
+
+
+   public static BodyType getBodyType(byte type) {
+      switch (type) {
+
+         case Message.DEFAULT_TYPE:
+            return BodyType.Undefined;
+
+         case Message.OBJECT_TYPE:
+            return BodyType.Object;
+
+         case Message.TEXT_TYPE:
+            return BodyType.Text;
+
+         case Message.BYTES_TYPE:
+            return BodyType.Text;
+
+         case Message.MAP_TYPE:
+            return BodyType.Map;
+
+         case Message.STREAM_TYPE:
+            return BodyType.Stream;
+
+         default:
+            return BodyType.Undefined;
+
+      }
+   }
+
+   @Override
+   public int getEncodeSize() {
+      checkEncode();
+      return buffer == null ? -1 : buffer.writerIndex();
+   }
+
+   @Override
+   public boolean isLargeMessage() {
+      return false;
+   }
+
+   @Override
+   public String getAddress() {
+      if (address == null) {
+         return null;
+      } else {
+         return address.toString();
+      }
+   }
+
+   @Override
+   public CoreMessage setAddress(String address) {
+      messageChanged();
+      this.address = SimpleString.toSimpleString(address);
+      return this;
+   }
+
+   @Override
+   public CoreMessage setBuffer(ByteBuf buffer) {
+      this.buffer = buffer;
+
+      return this;
+   }
+
+   @Override
+   public ByteBuf getBuffer() {
+      return buffer;
+   }
+
+   @Override
+   public boolean isDurable() {
+      return durable;
+   }
+
+   @Override
+   public CoreMessage setDurable(boolean durable) {
+      messageChanged();
+      this.durable = durable;
+      return this;
+   }
+
+
+   @Override
+   public CoreMessage putBooleanProperty(final String key, final boolean value) {
+      messageChanged();
+      checkProperties();
+      properties.putBooleanProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) {
+      messageChanged();
+      checkProperties();
+      properties.putBooleanProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getBooleanProperty(key);
+   }
+
+   @Override
+   public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getBooleanProperty(new SimpleString(key));
+   }
+
+
+   @Override
+   public CoreMessage putByteProperty(final SimpleString key, final byte value) {
+      messageChanged();
+      checkProperties();
+      properties.putByteProperty(key, value);
+      return this;
+   }
+
+
+   @Override
+   public CoreMessage putByteProperty(final String key, final byte value) {
+      messageChanged();
+      checkProperties();
+      properties.putByteProperty(new SimpleString(key), value);
+
+      return this;
+   }
+
+
+   @Override
+   public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getByteProperty(key);
+   }
+
+   @Override
+   public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getByteProperty(SimpleString.toSimpleString(key));
+   }
+
+   @Override
+   public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) {
+      messageChanged();
+      checkProperties();
+      properties.putBytesProperty(key, value);
+
+      return this;
+   }
+
+   @Override
+   public CoreMessage putBytesProperty(final String key, final byte[] value) {
+      messageChanged();
+      checkProperties();
+      properties.putBytesProperty(new SimpleString(key), value);
+      return this;
+   }
+
+
+   @Override
+   public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getBytesProperty(key);
+   }
+
+   @Override
+   public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getBytesProperty(new SimpleString(key));
+   }
+
+   @Override
+   public CoreMessage putCharProperty(SimpleString key, char value) {
+      messageChanged();
+      checkProperties();
+      properties.putCharProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putCharProperty(String key, char value) {
+      messageChanged();
+      checkProperties();
+      properties.putCharProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putShortProperty(final SimpleString key, final short value) {
+      messageChanged();
+      checkProperties();
+      properties.putShortProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putShortProperty(final String key, final short value) {
+      messageChanged();
+      checkProperties();
+      properties.putShortProperty(new SimpleString(key), value);
+      return this;
+   }
+
+
+   @Override
+   public CoreMessage putIntProperty(final SimpleString key, final int value) {
+      messageChanged();
+      checkProperties();
+      properties.putIntProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putIntProperty(final String key, final int value) {
+      messageChanged();
+      checkProperties();
+      properties.putIntProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getIntProperty(key);
+   }
+
+   @Override
+   public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getIntProperty(SimpleString.toSimpleString(key));
+   }
+
+
+   @Override
+   public CoreMessage putLongProperty(final SimpleString key, final long value) {
+      messageChanged();
+      checkProperties();
+      properties.putLongProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putLongProperty(final String key, final long value) {
+      messageChanged();
+      checkProperties();
+      properties.putLongProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getLongProperty(key);
+   }
+
+   @Override
+   public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return getLongProperty(SimpleString.toSimpleString(key));
+   }
+
+
+   @Override
+   public CoreMessage putFloatProperty(final SimpleString key, final float value) {
+      messageChanged();
+      checkProperties();
+      properties.putFloatProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putFloatProperty(final String key, final float value) {
+      messageChanged();
+      checkProperties();
+      properties.putFloatProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putDoubleProperty(final SimpleString key, final double value) {
+      messageChanged();
+      checkProperties();
+      properties.putDoubleProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putDoubleProperty(final String key, final double value) {
+      messageChanged();
+      checkProperties();
+      properties.putDoubleProperty(new SimpleString(key), value);
+      return this;
+   }
+
+
+   @Override
+   public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      messageChanged();
+      checkProperties();
+      return properties.getDoubleProperty(key);
+   }
+
+   @Override
+   public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return getDoubleProperty(SimpleString.toSimpleString(key));
+   }
+
+   @Override
+   public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) {
+      messageChanged();
+      checkProperties();
+      properties.putSimpleStringProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putStringProperty(final String key, final String value) {
+      messageChanged();
+      checkProperties();
+      properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
+      return this;
+   }
+
+   @Override
+   public CoreMessage putObjectProperty(final SimpleString key,
+                                        final Object value) throws ActiveMQPropertyConversionException {
+      messageChanged();
+      checkProperties();
+      TypedProperties.setObjectProperty(key, value, properties);
+      return this;
+   }
+
+   @Override
+   public Object getObjectProperty(final String key) {
+      checkProperties();
+      return getObjectProperty(SimpleString.toSimpleString(key));
+   }
+
+   @Override
+   public Object getObjectProperty(final SimpleString key) {
+      checkProperties();
+      return properties.getProperty(key);
+   }
+
+   @Override
+   public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
+      messageChanged();
+      putObjectProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getShortProperty(key);
+   }
+
+   @Override
+   public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getShortProperty(new SimpleString(key));
+   }
+
+   @Override
+   public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getFloatProperty(key);
+   }
+
+   @Override
+   public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getFloatProperty(new SimpleString(key));
+   }
+
+   @Override
+   public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      SimpleString str = getSimpleStringProperty(key);
+
+      if (str == null) {
+         return null;
+      } else {
+         return str.toString();
+      }
+   }
+
+   @Override
+   public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getStringProperty(new SimpleString(key));
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getSimpleStringProperty(key);
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getSimpleStringProperty(new SimpleString(key));
+   }
+
+   @Override
+   public Object removeProperty(final SimpleString key) {
+      checkProperties();
+      Object oldValue = properties.removeProperty(key);
+      if (oldValue != null) {
+         messageChanged();
+      }
+      return oldValue;
+   }
+
+   @Override
+   public Object removeProperty(final String key) {
+      messageChanged();
+      checkProperties();
+      Object oldValue = properties.removeProperty(new SimpleString(key));
+      if (oldValue != null) {
+         messageChanged();
+      }
+      return oldValue;
+   }
+
+   @Override
+   public boolean containsProperty(final SimpleString key) {
+      checkProperties();
+      return properties.containsProperty(key);
+   }
+
+   @Override
+   public boolean containsProperty(final String key) {
+      checkProperties();
+      return properties.containsProperty(new SimpleString(key));
+   }
+
+   @Override
+   public Set<SimpleString> getPropertyNames() {
+      checkProperties();
+      return properties.getPropertyNames();
+   }
+
+   @Override
+   public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
+      return new DecodingContext();
+   }
+
+   private final class DecodingContext implements LargeBodyEncoder {
+
+      private int lastPos = 0;
+
+      private DecodingContext() {
+      }
+
+      @Override
+      public void open() {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public long getLargeBodySize() {
+         return buffer.writerIndex();
+      }
+
+      @Override
+      public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
+         ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
+         return encode(buffer, bufferRead.capacity());
+      }
+
+      @Override
+      public int encode(final ActiveMQBuffer bufferOut, final int size) {
+         bufferOut.byteBuf().writeBytes(buffer, lastPos, size);
+         lastPos += size;
+         return size;
+      }
+   }
+
+   @Override
+   public int getPersistSize() {
+      checkEncode();
+      return buffer.writerIndex() + DataConstants.SIZE_INT;
+   }
+
+   @Override
+   public void persist(ActiveMQBuffer targetRecord) {
+      checkEncode();
+      targetRecord.writeInt(buffer.writerIndex());
+      targetRecord.writeBytes(buffer, 0, buffer.writerIndex());
+   }
+
+   @Override
+   public void reloadPersistence(ActiveMQBuffer record) {
+      int size = record.readInt();
+      initBuffer(size);
+      buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
+      decode();
+
+   }
+
+   @Override
+   public CoreMessage toCore() {
+      return this;
+   }
+
+
+
+   @Override
+   public String toString() {
+      try {
+         return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority()  +
+            ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
+            ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
+      } catch (Throwable e) {
+         return "ServerMessage[messageID=" + messageID + "]";
+      }
+   }
+
+
+   private static String toDate(long timestamp) {
+      if (timestamp == 0) {
+         return "0";
+      } else {
+         return new java.util.Date(timestamp).toString();
+      }
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
new file mode 100644
index 0000000..ddf39d2
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.impl;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class CoreMessagePersister implements Persister<Message> {
+
+   public static CoreMessagePersister theInstance = new CoreMessagePersister();
+
+   public static CoreMessagePersister getInstance() {
+      return theInstance;
+   }
+
+   protected CoreMessagePersister() {
+   }
+
+
+   @Override
+   public int getEncodeSize(Message record) {
+      return DataConstants.SIZE_BYTE + record.getPersistSize() +
+         SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG;
+   }
+
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      buffer.writeByte((byte)1);
+      buffer.writeLong(record.getMessageID());
+      buffer.writeNullableSimpleString(record.getAddressSimpleString());
+      record.persist(buffer);
+   }
+
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message record) {
+      // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
+      long id = buffer.readLong();
+      SimpleString address = buffer.readNullableSimpleString();
+      record = new CoreMessage();
+      record.reloadPersistence(buffer);
+      record.setMessageID(id);
+      record.setAddress(address);
+      return record;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
deleted file mode 100644
index f93086c..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ /dev/null
@@ -1,1059 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message.impl;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUID;
-
-/**
- * A concrete implementation of a message
- * <p>
- * All messages handled by ActiveMQ Artemis core are of this type
- */
-public abstract class MessageImpl implements MessageInternal {
-
-   public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
-
-   public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO");
-
-   public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO");
-
-   // used by the bridges to set duplicates
-   public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP");
-
-   public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
-
-   public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
-
-   protected long messageID;
-
-   protected SimpleString address;
-
-   protected byte type;
-
-   protected boolean durable;
-
-   /**
-    * GMT milliseconds at which this message expires. 0 means never expires *
-    */
-   private long expiration;
-
-   protected long timestamp;
-
-   protected TypedProperties properties;
-
-   protected byte priority;
-
-   protected volatile ActiveMQBuffer buffer;
-
-   protected volatile ResetLimitWrappedActiveMQBuffer bodyBuffer;
-
-   protected volatile boolean bufferValid;
-
-   private int endOfBodyPosition = -1;
-
-   private int endOfMessagePosition;
-
-   private UUID userID;
-
-   // Constructors --------------------------------------------------
-
-   protected MessageImpl() {
-      properties = new TypedProperties();
-   }
-
-   /**
-    * overridden by the client message, we need access to the connection so we can create the appropriate ActiveMQBuffer.
-    *
-    * @param type
-    * @param durable
-    * @param expiration
-    * @param timestamp
-    * @param priority
-    * @param initialMessageBufferSize
-    */
-   protected MessageImpl(final byte type,
-                         final boolean durable,
-                         final long expiration,
-                         final long timestamp,
-                         final byte priority,
-                         final int initialMessageBufferSize) {
-      this();
-      this.type = type;
-      this.durable = durable;
-      this.expiration = expiration;
-      this.timestamp = timestamp;
-      this.priority = priority;
-      createBody(initialMessageBufferSize);
-   }
-
-   protected MessageImpl(final int initialMessageBufferSize) {
-      this();
-      createBody(initialMessageBufferSize);
-   }
-
-   /*
-    * Copy constructor
-    */
-   protected MessageImpl(final MessageImpl other) {
-      this(other, other.getProperties());
-   }
-
-   /*
-    * Copy constructor
-    */
-   protected MessageImpl(final MessageImpl other, TypedProperties properties) {
-      messageID = other.getMessageID();
-      userID = other.getUserID();
-      address = other.getAddress();
-      type = other.getType();
-      durable = other.isDurable();
-      expiration = other.getExpiration();
-      timestamp = other.getTimestamp();
-      priority = other.getPriority();
-      this.properties = new TypedProperties(properties);
-
-      // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
-      // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
-      // many subscriptions and bridging to other nodes in a cluster
-      synchronized (other) {
-         bufferValid = false;
-         endOfBodyPosition = -1;
-         endOfMessagePosition = other.endOfMessagePosition;
-
-         if (other.buffer != null) {
-            // We need to copy the underlying buffer too, since the different messsages thereafter might have different
-            // properties set on them, making their encoding different
-            buffer = other.buffer.copy(0, other.buffer.capacity());
-
-            buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
-
-            bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
-
-            bodyBuffer.readerIndex(BODY_OFFSET);
-            bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
-            endOfBodyPosition = other.endOfBodyPosition;
-         }
-      }
-   }
-
-   // Message implementation ----------------------------------------
-
-   @Override
-   public int getEncodeSize() {
-      int headersPropsSize = getHeadersAndPropertiesEncodeSize();
-
-      int bodyPos = getEndOfBodyPosition();
-
-      int bodySize = bodyPos - BUFFER_HEADER_SPACE - DataConstants.SIZE_INT;
-
-      return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT + headersPropsSize;
-   }
-
-   @Override
-   public int getHeadersAndPropertiesEncodeSize() {
-      return DataConstants.SIZE_LONG + // Message ID
-         DataConstants.SIZE_BYTE + // user id null?
-         (userID == null ? 0 : 16) +
-             /* address */SimpleString.sizeofNullableString(address) +
-         DataConstants./* Type */SIZE_BYTE +
-         DataConstants./* Durable */SIZE_BOOLEAN +
-         DataConstants./* Expiration */SIZE_LONG +
-         DataConstants./* Timestamp */SIZE_LONG +
-         DataConstants./* Priority */SIZE_BYTE +
-             /* PropertySize and Properties */properties.getEncodeSize();
-   }
-
-   @Override
-   public void encodeHeadersAndProperties(final ActiveMQBuffer buffer) {
-      buffer.writeLong(messageID);
-      buffer.writeNullableSimpleString(address);
-      if (userID == null) {
-         buffer.writeByte(DataConstants.NULL);
-      } else {
-         buffer.writeByte(DataConstants.NOT_NULL);
-         buffer.writeBytes(userID.asBytes());
-      }
-      buffer.writeByte(type);
-      buffer.writeBoolean(durable);
-      buffer.writeLong(expiration);
-      buffer.writeLong(timestamp);
-      buffer.writeByte(priority);
-      properties.encode(buffer);
-   }
-
-   @Override
-   public void decodeHeadersAndProperties(final ActiveMQBuffer buffer) {
-      messageID = buffer.readLong();
-      address = buffer.readNullableSimpleString();
-      if (buffer.readByte() == DataConstants.NOT_NULL) {
-         byte[] bytes = new byte[16];
-         buffer.readBytes(bytes);
-         userID = new UUID(UUID.TYPE_TIME_BASED, bytes);
-      } else {
-         userID = null;
-      }
-      type = buffer.readByte();
-      durable = buffer.readBoolean();
-      expiration = buffer.readLong();
-      timestamp = buffer.readLong();
-      priority = buffer.readByte();
-      properties.decode(buffer);
-   }
-
-   public void copyHeadersAndProperties(final MessageInternal msg) {
-      messageID = msg.getMessageID();
-      address = msg.getAddress();
-      userID = msg.getUserID();
-      type = msg.getType();
-      durable = msg.isDurable();
-      expiration = msg.getExpiration();
-      timestamp = msg.getTimestamp();
-      priority = msg.getPriority();
-      properties = msg.getTypedProperties();
-   }
-
-   @Override
-   public ActiveMQBuffer getBodyBuffer() {
-      if (bodyBuffer == null) {
-         bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
-      }
-
-      return bodyBuffer;
-   }
-
-   @Override
-   public Message writeBodyBufferBytes(byte[] bytes) {
-      getBodyBuffer().writeBytes(bytes);
-
-      return this;
-   }
-
-   @Override
-   public Message writeBodyBufferString(String string) {
-      getBodyBuffer().writeString(string);
-
-      return this;
-   }
-
-   public void checkCompletion() throws ActiveMQException {
-      // no op on regular messages
-   }
-
-   @Override
-   public synchronized ActiveMQBuffer getBodyBufferDuplicate() {
-
-      // Must copy buffer before sending it
-
-      ByteBuf byteBuf = ChannelBufferWrapper.unwrap(getBodyBuffer().byteBuf());
-      byteBuf = byteBuf.duplicate();
-      byteBuf.readerIndex(getBodyBuffer().readerIndex());
-      byteBuf.writerIndex(getBodyBuffer().writerIndex());
-
-      return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, byteBuf, null);
-   }
-
-   @Override
-   public long getMessageID() {
-      return messageID;
-   }
-
-   @Override
-   public UUID getUserID() {
-      return userID;
-   }
-
-   @Override
-   public MessageImpl setUserID(final UUID userID) {
-      this.userID = userID;
-      return this;
-   }
-
-   /**
-    * this doesn't need to be synchronized as setAddress is protecting the buffer,
-    * not the address
-    */
-   @Override
-   public SimpleString getAddress() {
-      return address;
-   }
-
-   /**
-    * The only reason this is synchronized is because of encoding a message versus invalidating the buffer.
-    * This synchronization can probably be removed since setAddress is always called from a single thread.
-    * However I will keep it as it's harmless and it's been well tested
-    */
-   @Override
-   public Message setAddress(final SimpleString address) {
-      // This is protecting the buffer
-      synchronized (this) {
-         if (this.address != address) {
-            this.address = address;
-
-            bufferValid = false;
-         }
-      }
-
-      return this;
-   }
-
-   @Override
-   public byte getType() {
-      return type;
-   }
-
-   public void setType(byte type) {
-      this.type = type;
-   }
-
-   @Override
-   public boolean isDurable() {
-      return durable;
-   }
-
-   @Override
-   public MessageImpl setDurable(final boolean durable) {
-      if (this.durable != durable) {
-         this.durable = durable;
-
-         bufferValid = false;
-      }
-      return this;
-   }
-
-   @Override
-   public long getExpiration() {
-      return expiration;
-   }
-
-   @Override
-   public MessageImpl setExpiration(final long expiration) {
-      if (this.expiration != expiration) {
-         this.expiration = expiration;
-
-         bufferValid = false;
-      }
-      return this;
-   }
-
-   @Override
-   public long getTimestamp() {
-      return timestamp;
-   }
-
-   @Override
-   public MessageImpl setTimestamp(final long timestamp) {
-      if (this.timestamp != timestamp) {
-         this.timestamp = timestamp;
-
-         bufferValid = false;
-      }
-      return this;
-   }
-
-   @Override
-   public byte getPriority() {
-      return priority;
-   }
-
-   @Override
-   public MessageImpl setPriority(final byte priority) {
-      if (this.priority != priority) {
-         this.priority = priority;
-
-         bufferValid = false;
-      }
-      return this;
-   }
-
-   @Override
-   public boolean isExpired() {
-      if (expiration == 0) {
-         return false;
-      }
-
-      return System.currentTimeMillis() - expiration >= 0;
-   }
-
-   @Override
-   public Map<String, Object> toMap() {
-      Map<String, Object> map = new HashMap<>();
-
-      map.put("messageID", messageID);
-      if (userID != null) {
-         map.put("userID", "ID:" + userID.toString());
-      }
-      map.put("address", address.toString());
-      map.put("type", type);
-      map.put("durable", durable);
-      map.put("expiration", expiration);
-      map.put("timestamp", timestamp);
-      map.put("priority", priority);
-      map.putAll(toPropertyMap());
-      return map;
-   }
-
-   @Override
-   public Map<String, Object> toPropertyMap() {
-      Map<String, Object> map = new HashMap<>();
-      for (SimpleString propName : properties.getPropertyNames()) {
-         map.put(propName.toString(), properties.getProperty(propName));
-      }
-      return map;
-   }
-
-   @Override
-   public void decodeFromBuffer(final ActiveMQBuffer buffer) {
-
-      this.buffer = copyMessageBuffer(buffer);
-
-      decode();
-
-      //synchronize indexes
-      buffer.setIndex(this.buffer.readerIndex(),this.buffer.writerIndex());
-
-      // Setting up the BodyBuffer based on endOfBodyPosition set from decode
-      ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, this.buffer, null);
-      tmpbodyBuffer.readerIndex(BODY_OFFSET);
-      tmpbodyBuffer.writerIndex(endOfBodyPosition);
-      // only set this after the writer and reader is set,
-      // otherwise the buffer would be reset through the listener
-      tmpbodyBuffer.setMessage(this);
-      this.bodyBuffer = tmpbodyBuffer;
-
-   }
-
-   private ActiveMQBuffer copyMessageBuffer(ActiveMQBuffer buffer) {
-      ActiveMQBuffer copiedBuffer;
-
-      ByteBuf newNettyBuffer = Unpooled.buffer( buffer.byteBuf().capacity() );
-
-      int read = buffer.byteBuf().readerIndex();
-      int writ = buffer.byteBuf().writerIndex();
-
-      int readArt = buffer.readerIndex();
-      int writArt = buffer.writerIndex();
-      buffer.byteBuf().readerIndex( 0 );
-
-      buffer.byteBuf().readBytes( newNettyBuffer, 0, buffer.byteBuf().writerIndex() );
-      buffer.byteBuf().setIndex( read, writ );
-      newNettyBuffer.setIndex( read, writ );
-
-      copiedBuffer = new ChannelBufferWrapper( newNettyBuffer );
-
-      buffer.setIndex( readArt, writArt );
-      copiedBuffer.setIndex( readArt, writArt );
-
-      return copiedBuffer;
-   }
-
-   @Override
-   public void bodyChanged() {
-      bufferValid = false;
-
-      endOfBodyPosition = -1;
-   }
-
-   @Override
-   public int getEndOfMessagePosition() {
-      return endOfMessagePosition;
-   }
-
-   @Override
-   public int getEndOfBodyPosition() {
-      if (endOfBodyPosition < 0) {
-         endOfBodyPosition = getBodyBuffer().writerIndex();
-      }
-      return endOfBodyPosition;
-   }
-
-   // Encode to journal or paging
-   public void encode(final ActiveMQBuffer buff) {
-      encodeToBuffer();
-
-      buff.writeBytes(buffer, BUFFER_HEADER_SPACE, endOfMessagePosition - BUFFER_HEADER_SPACE);
-   }
-
-   // Decode from journal or paging
-   public void decode(final ActiveMQBuffer buff) {
-      int start = buff.readerIndex();
-
-      endOfBodyPosition = buff.readInt();
-
-      endOfMessagePosition = buff.getInt(endOfBodyPosition - BUFFER_HEADER_SPACE + start);
-
-      int length = endOfMessagePosition - BUFFER_HEADER_SPACE;
-
-      buffer.setIndex(0, BUFFER_HEADER_SPACE);
-
-      buffer.writeBytes(buff, start, length);
-
-      decode();
-
-      buff.readerIndex(start + length);
-   }
-
-   @Override
-   public synchronized ActiveMQBuffer getEncodedBuffer() {
-      ActiveMQBuffer buff = encodeToBuffer();
-      return buff.duplicate();
-   }
-
-   @Override
-   public void setAddressTransient(final SimpleString address) {
-      this.address = address;
-   }
-
-   // Properties
-   // ---------------------------------------------------------------------------------------
-
-   @Override
-   public Message putBooleanProperty(final SimpleString key, final boolean value) {
-      properties.putBooleanProperty(key, value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putByteProperty(final SimpleString key, final byte value) {
-      properties.putByteProperty(key, value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putBytesProperty(final SimpleString key, final byte[] value) {
-      properties.putBytesProperty(key, value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putCharProperty(SimpleString key, char value) {
-      properties.putCharProperty(key, value);
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putCharProperty(String key, char value) {
-      properties.putCharProperty(new SimpleString(key), value);
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putShortProperty(final SimpleString key, final short value) {
-      properties.putShortProperty(key, value);
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putIntProperty(final SimpleString key, final int value) {
-      properties.putIntProperty(key, value);
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putLongProperty(final SimpleString key, final long value) {
-      properties.putLongProperty(key, value);
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putFloatProperty(final SimpleString key, final float value) {
-      properties.putFloatProperty(key, value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putDoubleProperty(final SimpleString key, final double value) {
-      properties.putDoubleProperty(key, value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putStringProperty(final SimpleString key, final SimpleString value) {
-      properties.putSimpleStringProperty(key, value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putObjectProperty(final SimpleString key,
-                                    final Object value) throws ActiveMQPropertyConversionException {
-      TypedProperties.setObjectProperty(key, value, properties);
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
-      putObjectProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putBooleanProperty(final String key, final boolean value) {
-      properties.putBooleanProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putByteProperty(final String key, final byte value) {
-      properties.putByteProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putBytesProperty(final String key, final byte[] value) {
-      properties.putBytesProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putShortProperty(final String key, final short value) {
-      properties.putShortProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putIntProperty(final String key, final int value) {
-      properties.putIntProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putLongProperty(final String key, final long value) {
-      properties.putLongProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putFloatProperty(final String key, final float value) {
-      properties.putFloatProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putDoubleProperty(final String key, final double value) {
-      properties.putDoubleProperty(new SimpleString(key), value);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Message putStringProperty(final String key, final String value) {
-      properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   public Message putTypedProperties(final TypedProperties otherProps) {
-      properties.putTypedProperties(otherProps);
-
-      bufferValid = false;
-
-      return this;
-   }
-
-   @Override
-   public Object getObjectProperty(final SimpleString key) {
-      return properties.getProperty(key);
-   }
-
-   @Override
-   public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getBooleanProperty(key);
-   }
-
-   @Override
-   public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getBooleanProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getByteProperty(key);
-   }
-
-   @Override
-   public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getByteProperty(new SimpleString(key));
-   }
-
-   @Override
-   public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getBytesProperty(key);
-   }
-
-   @Override
-   public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
-      return getBytesProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getDoubleProperty(key);
-   }
-
-   @Override
-   public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getDoubleProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getIntProperty(key);
-   }
-
-   @Override
-   public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getIntProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getLongProperty(key);
-   }
-
-   @Override
-   public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getLongProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getShortProperty(key);
-   }
-
-   @Override
-   public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getShortProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getFloatProperty(key);
-   }
-
-   @Override
-   public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getFloatProperty(new SimpleString(key));
-   }
-
-   @Override
-   public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      SimpleString str = getSimpleStringProperty(key);
-
-      if (str == null) {
-         return null;
-      } else {
-         return str.toString();
-      }
-   }
-
-   @Override
-   public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
-      return getStringProperty(new SimpleString(key));
-   }
-
-   @Override
-   public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
-      return properties.getSimpleStringProperty(key);
-   }
-
-   @Override
-   public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
-      return properties.getSimpleStringProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Object getObjectProperty(final String key) {
-      return properties.getProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Object removeProperty(final SimpleString key) {
-      bufferValid = false;
-
-      return properties.removeProperty(key);
-   }
-
-   @Override
-   public Object removeProperty(final String key) {
-      bufferValid = false;
-
-      return properties.removeProperty(new SimpleString(key));
-   }
-
-   @Override
-   public boolean containsProperty(final SimpleString key) {
-      return properties.containsProperty(key);
-   }
-
-   @Override
-   public boolean containsProperty(final String key) {
-      return properties.containsProperty(new SimpleString(key));
-   }
-
-   @Override
-   public Set<SimpleString> getPropertyNames() {
-      return properties.getPropertyNames();
-   }
-
-   @Override
-   public ActiveMQBuffer getWholeBuffer() {
-      return buffer;
-   }
-
-   @Override
-   public BodyEncoder getBodyEncoder() throws ActiveMQException {
-      return new DecodingContext();
-   }
-
-   @Override
-   public TypedProperties getTypedProperties() {
-      return this.properties;
-   }
-
-   @Override
-   public boolean equals(Object other) {
-
-      if (this == other) {
-         return true;
-      }
-
-      if (other instanceof MessageImpl) {
-         MessageImpl message = (MessageImpl) other;
-
-         if (this.getMessageID() == message.getMessageID())
-            return true;
-      }
-
-      return false;
-   }
-
-   /**
-    * Debug Helper!!!!
-    *
-    * I'm leaving this message here without any callers for a reason:
-    * During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them.
-    * Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!!
-    *
-    * @return
-    */
-   public String bodyToString() {
-      getEndOfBodyPosition();
-      int readerIndex1 = this.buffer.readerIndex();
-      buffer.readerIndex(0);
-      byte[] buffer1 = new byte[buffer.writerIndex()];
-      buffer.readBytes(buffer1);
-      buffer.readerIndex(readerIndex1);
-
-      byte[] buffer2 = null;
-      if (bodyBuffer != null) {
-         int readerIndex2 = this.bodyBuffer.readerIndex();
-         bodyBuffer.readerIndex(0);
-         buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
-         bodyBuffer.readBytes(buffer2);
-         bodyBuffer.readerIndex(readerIndex2);
-         return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
-      } else {
-         return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1);
-      }
-
-   }
-
-   @Override
-   public int hashCode() {
-      return 31 + (int) (messageID ^ (messageID >>> 32));
-   }
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   public TypedProperties getProperties() {
-      return properties;
-   }
-
-   // This must be synchronized as it can be called concurrently id the message is being delivered
-   // concurrently to
-   // many queues - the first caller in this case will actually encode it
-   private synchronized ActiveMQBuffer encodeToBuffer() {
-      if (!bufferValid) {
-         int bodySize = getEndOfBodyPosition();
-
-         // write it
-         buffer.setInt(BUFFER_HEADER_SPACE, bodySize);
-
-         // Position at end of body and skip past the message end position int.
-         // check for enough room in the buffer even though it is dynamic
-         if ((bodySize + 4) > buffer.capacity()) {
-            buffer.setIndex(0, bodySize);
-            buffer.writeInt(0);
-         } else {
-            buffer.setIndex(0, bodySize + DataConstants.SIZE_INT);
-         }
-
-         encodeHeadersAndProperties(buffer);
-
-         // Write end of message position
-
-         endOfMessagePosition = buffer.writerIndex();
-
-         buffer.setInt(bodySize, endOfMessagePosition);
-
-         bufferValid = true;
-      }
-
-      return buffer;
-   }
-
-   private void decode() {
-      endOfBodyPosition = buffer.getInt(BUFFER_HEADER_SPACE);
-
-      buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
-
-      decodeHeadersAndProperties(buffer);
-
-      endOfMessagePosition = buffer.readerIndex();
-
-      bufferValid = true;
-   }
-
-   public void createBody(final int initialMessageBufferSize) {
-      buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize);
-
-      // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
-      buffer.writeByte((byte) 0);
-
-      buffer.setIndex(BODY_OFFSET, BODY_OFFSET);
-   }
-
-   // Inner classes -------------------------------------------------
-
-   private final class DecodingContext implements BodyEncoder {
-
-      private int lastPos = 0;
-
-      private DecodingContext() {
-      }
-
-      @Override
-      public void open() {
-      }
-
-      @Override
-      public void close() {
-      }
-
-      @Override
-      public long getLargeBodySize() {
-         return buffer.writerIndex();
-      }
-
-      @Override
-      public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
-         ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
-         return encode(buffer, bufferRead.capacity());
-      }
-
-      @Override
-      public int encode(final ActiveMQBuffer bufferOut, final int size) {
-         bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
-         lastPos += size;
-         return size;
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
deleted file mode 100644
index a7b2199..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message.impl;
-
-import java.io.InputStream;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.utils.TypedProperties;
-
-public interface MessageInternal extends Message {
-
-   void decodeFromBuffer(ActiveMQBuffer buffer);
-
-   int getEndOfMessagePosition();
-
-   int getEndOfBodyPosition();
-
-   void bodyChanged();
-
-   boolean isServerMessage();
-
-   ActiveMQBuffer getEncodedBuffer();
-
-   int getHeadersAndPropertiesEncodeSize();
-
-   ActiveMQBuffer getWholeBuffer();
-
-   void encodeHeadersAndProperties(ActiveMQBuffer buffer);
-
-   void decodeHeadersAndProperties(ActiveMQBuffer buffer);
-
-   BodyEncoder getBodyEncoder() throws ActiveMQException;
-
-   InputStream getBodyInputStream();
-
-   void setAddressTransient(SimpleString address);
-
-   TypedProperties getTypedProperties();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index ae1cf71..38cc177 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -31,7 +31,9 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -45,7 +47,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
 import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
@@ -103,7 +105,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -422,12 +423,12 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
-   public int getCreditsOnSendingFull(MessageInternal msgI) {
+   public int getCreditsOnSendingFull(Message msgI) {
       return msgI.getEncodeSize();
    }
 
    @Override
-   public void sendFullMessage(MessageInternal msgI,
+   public void sendFullMessage(ICoreMessage msgI,
                                boolean sendBlocking,
                                SendAcknowledgementHandler handler,
                                SimpleString defaultAddress) throws ActiveMQException {
@@ -441,16 +442,16 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
-   public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException {
+   public int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException {
       SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI);
 
       sessionChannel.send(initialChunk);
 
-      return msgI.getHeadersAndPropertiesEncodeSize();
+      return ((CoreMessage)msgI).getHeadersAndPropertiesEncodeSize();
    }
 
    @Override
-   public int sendLargeMessageChunk(MessageInternal msgI,
+   public int sendLargeMessageChunk(Message msgI,
                                     long messageBodySize,
                                     boolean sendBlocking,
                                     boolean lastChunk,
@@ -471,7 +472,7 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
-   public int sendServerLargeMessageChunk(MessageInternal msgI,
+   public int sendServerLargeMessageChunk(Message msgI,
                                           long messageBodySize,
                                           boolean sendBlocking,
                                           boolean lastChunk,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 0f5cdf0..e95227d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -371,6 +371,7 @@ public final class ChannelImpl implements Channel {
             if (logger.isTraceEnabled()) {
                logger.trace("Sending blocking " + packet);
             }
+
             connection.getTransportConnection().write(buffer, false, false);
 
             long toWait = connection.getBlockingCallTimeout();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 9025210..08c17e4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -16,8 +16,11 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -25,6 +28,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
 public class PacketImpl implements Packet {
    // Constants -------------------------------------------------------------------------
 
+
    public static final int ADDRESSING_CHANGE_VERSION = 129;
 
    public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@@ -310,7 +314,7 @@ public class PacketImpl implements Packet {
 
    @Override
    public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
-      ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
+      ActiveMQBuffer buffer = createPacket(connection, usePooled);
 
       // The standard header fields
 
@@ -330,6 +334,14 @@ public class PacketImpl implements Packet {
       return buffer;
    }
 
+   protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+      if (connection == null) {
+         return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE));
+      } else {
+         return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
+      }
+   }
+
    @Override
    public void decode(final ActiveMQBuffer buffer) {
       channelID = buffer.readLong();
@@ -339,6 +351,22 @@ public class PacketImpl implements Packet {
       size = buffer.readerIndex();
    }
 
+   protected ByteBuf copyMessageBuffer(ByteBuf buffer, int skipBytes) {
+
+      ByteBuf newNettyBuffer = Unpooled.buffer(buffer.capacity() - PACKET_HEADERS_SIZE - skipBytes);
+
+      int read = buffer.readerIndex();
+      int writ = buffer.writerIndex();
+      buffer.readerIndex(PACKET_HEADERS_SIZE);
+
+      newNettyBuffer.writeBytes(buffer, buffer.readableBytes() - skipBytes);
+      buffer.setIndex( read, writ );
+      newNettyBuffer.setIndex( 0, writ - PACKET_HEADERS_SIZE - skipBytes);
+
+      return newNettyBuffer;
+   }
+
+
    @Override
    public int getPacketSize() {
       if (size == -1) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 8bd62ca..cada061 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -353,6 +353,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
          }
 
          dataReceived = true;
+
          doBufferReceived(packet);
 
          super.bufferReceived(connectionID, buffer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
index 6a52a27..49989d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
@@ -16,22 +16,25 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
 
-   protected MessageInternal message;
+   protected ICoreMessage message;
 
-   public MessagePacket(final byte type, final MessageInternal message) {
+   public MessagePacket(final byte type, final ICoreMessage message) {
       super(type);
 
       this.message = message;
    }
 
    @Override
-   public Message getMessage() {
+   public ICoreMessage getMessage() {
       return message;
    }
 
@@ -40,4 +43,12 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI
       return super.getParentString() + ", message=" + message;
    }
 
+   protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection, boolean usePooled) {
+      if (connection == null) {
+         return new ChannelBufferWrapper(Unpooled.buffer(size));
+      } else {
+         return connection.createTransportBuffer(size, usePooled);
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
index 66e509c..e9e3138 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveClientLargeMessage.java
@@ -17,12 +17,13 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 
 public class SessionReceiveClientLargeMessage extends SessionReceiveLargeMessage {
 
-   public SessionReceiveClientLargeMessage(MessageInternal message) {
+   public SessionReceiveClientLargeMessage(Message message) {
       super(message);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
index 64f96f9..dc2c458 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
@@ -18,12 +18,12 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
 public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI {
 
-   private final MessageInternal message;
+   private final Message message;
 
    /**
     * Since we receive the message before the entire message was received,
@@ -35,13 +35,13 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
    private int deliveryCount;
 
    // To be used on decoding at the client while receiving a large message
-   public SessionReceiveLargeMessage(final MessageInternal message) {
+   public SessionReceiveLargeMessage(final Message message) {
       super(SESS_RECEIVE_LARGE_MSG);
       this.message = message;
    }
 
    public SessionReceiveLargeMessage(final long consumerID,
-                                     final MessageInternal message,
+                                     final Message message,
                                      final long largeMessageSize,
                                      final int deliveryCount) {
       super(SESS_RECEIVE_LARGE_MSG);
@@ -55,7 +55,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
       this.largeMessageSize = largeMessageSize;
    }
 
-   public MessageInternal getLargeMessage() {
+   public Message getLargeMessage() {
       return message;
    }
 
@@ -85,7 +85,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
       buffer.writeInt(deliveryCount);
       buffer.writeLong(largeMessageSize);
       if (message != null) {
-         message.encodeHeadersAndProperties(buffer);
+         ((CoreMessage)message).encodeHeadersAndProperties(buffer.byteBuf());
       }
    }
 
@@ -94,7 +94,7 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
       consumerID = buffer.readLong();
       deliveryCount = buffer.readInt();
       largeMessageSize = buffer.readLong();
-      message.decodeHeadersAndProperties(buffer);
+      ((CoreMessage)message).decodeHeadersAndProperties(buffer.byteBuf());
    }
 
    @Override


Mime
View raw message