activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [18/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Mon, 06 Mar 2017 11:54:05 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index c21ebda..b0ab52b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -17,7 +17,8 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -30,7 +31,7 @@ public class SessionReceiveMessage extends MessagePacket {
 
    private int deliveryCount;
 
-   public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount) {
+   public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) {
       super(SESS_RECEIVE_MSG, message);
 
       this.consumerID = consumerID;
@@ -38,7 +39,7 @@ public class SessionReceiveMessage extends MessagePacket {
       this.deliveryCount = deliveryCount;
    }
 
-   public SessionReceiveMessage(final MessageInternal message) {
+   public SessionReceiveMessage(final CoreMessage message) {
       super(SESS_RECEIVE_MSG, message);
    }
 
@@ -53,53 +54,28 @@ public class SessionReceiveMessage extends MessagePacket {
    }
 
    @Override
-   public ActiveMQBuffer encode(final RemotingConnection connection) {
-      ActiveMQBuffer buffer = message.getEncodedBuffer();
-
-      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true);
-      bufferWrite.writeBytes(buffer, 0, buffer.capacity());
-      bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
-
-      // Sanity check
-      if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
-         throw new IllegalStateException("Wrong encode position");
-      }
-
-      bufferWrite.writeLong(consumerID);
-      bufferWrite.writeInt(deliveryCount);
-
-      size = bufferWrite.writerIndex();
-
-      // Write standard headers
-
-      int len = size - DataConstants.SIZE_INT;
-      bufferWrite.setInt(0, len);
-      bufferWrite.setByte(DataConstants.SIZE_INT, getType());
-      bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
-
-      // Position reader for reading by Netty
-      bufferWrite.setIndex(0, size);
-
-      return bufferWrite;
+   protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+      return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection, usePooled);
    }
 
    @Override
-   public void decode(final ActiveMQBuffer buffer) {
-      channelID = buffer.readLong();
-
-      message.decodeFromBuffer(buffer);
-
-      consumerID = buffer.readLong();
+   public void encodeRest(ActiveMQBuffer buffer) {
+      message.sendBuffer(buffer.byteBuf(), deliveryCount);
+      buffer.writeLong(consumerID);
+      buffer.writeInt(deliveryCount);
+   }
 
-      deliveryCount = buffer.readInt();
+   @Override
+   public void decodeRest(final ActiveMQBuffer buffer) {
+      // Buffer comes in after having read standard headers and positioned at Beginning of body part
 
-      size = buffer.readerIndex();
+      message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
 
-      // Need to position buffer for reading
+      buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT);
+      this.consumerID = buffer.readLong();
+      this.deliveryCount = buffer.readInt();
 
-      buffer.setIndex(PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());
    }
-
    @Override
    public int hashCode() {
       final int prime = 31;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index b4ec027..0ecfe33 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -17,8 +17,8 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 
 /**
  * A SessionSendContinuationMessage<br>
@@ -28,7 +28,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
    private boolean requiresResponse;
 
    // Used on confirmation handling
-   private MessageInternal message;
+   private Message message;
    /**
     * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
     * <br>
@@ -58,7 +58,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
     * @param continues
     * @param requiresResponse
     */
-   public SessionSendContinuationMessage(final MessageInternal message,
+   public SessionSendContinuationMessage(final Message message,
                                          final byte[] body,
                                          final boolean continues,
                                          final boolean requiresResponse,
@@ -87,7 +87,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
    /**
     * @return the message
     */
-   public MessageInternal getMessage() {
+   public Message getMessage() {
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
index bf4290b..869940c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
 public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI {
@@ -26,13 +26,13 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
    /**
     * Used only if largeMessage
     */
-   private final MessageInternal largeMessage;
+   private final Message largeMessage;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionSendLargeMessage(final MessageInternal largeMessage) {
+   public SessionSendLargeMessage(final Message largeMessage) {
       super(SESS_SEND_LARGE);
 
       this.largeMessage = largeMessage;
@@ -40,7 +40,7 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
 
    // Public --------------------------------------------------------
 
-   public MessageInternal getLargeMessage() {
+   public Message getLargeMessage() {
       return largeMessage;
    }
 
@@ -51,12 +51,12 @@ public class SessionSendLargeMessage extends PacketImpl implements MessagePacket
 
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
-      largeMessage.encodeHeadersAndProperties(buffer);
+      ((CoreMessage)largeMessage).encodeHeadersAndProperties(buffer.byteBuf());
    }
 
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
-      largeMessage.decodeHeadersAndProperties(buffer);
+      ((CoreMessage)largeMessage).decodeHeadersAndProperties(buffer.byteBuf());
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index c7bb30e..43bb0be 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.DataConstants;
 
 public class SessionSendMessage extends MessagePacket {
 
@@ -36,7 +37,8 @@ public class SessionSendMessage extends MessagePacket {
     */
    private final transient SendAcknowledgementHandler handler;
 
-   public SessionSendMessage(final MessageInternal message,
+   /** This will be using the CoreMessage because it is meant for the core-protocol */
+   public SessionSendMessage(final ICoreMessage message,
                              final boolean requiresResponse,
                              final SendAcknowledgementHandler handler) {
       super(SESS_SEND, message);
@@ -44,7 +46,7 @@ public class SessionSendMessage extends MessagePacket {
       this.requiresResponse = requiresResponse;
    }
 
-   public SessionSendMessage(final MessageInternal message) {
+   public SessionSendMessage(final CoreMessage message) {
       super(SESS_SEND, message);
       this.handler = null;
    }
@@ -60,53 +62,29 @@ public class SessionSendMessage extends MessagePacket {
    }
 
    @Override
-   public ActiveMQBuffer encode(final RemotingConnection connection) {
-      ActiveMQBuffer buffer = message.getEncodedBuffer();
-
-      ActiveMQBuffer bufferWrite;
-      if (connection == null) {
-         // this is for unit tests only
-         bufferWrite = buffer.copy(0, buffer.capacity());
-      } else {
-         bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse
-      }
-      bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
-      bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
-
-      // Sanity check
-      if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
-         throw new IllegalStateException("Wrong encode position");
-      }
-
-      bufferWrite.writeBoolean(requiresResponse);
-
-      size = bufferWrite.writerIndex();
-
-      // Write standard headers
+   protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+      return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection, usePooled);
+   }
 
-      int len = size - DataConstants.SIZE_INT;
-      bufferWrite.setInt(0, len);
-      bufferWrite.setByte(DataConstants.SIZE_INT, getType());
-      bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
+   @Override
+   public void encodeRest(ActiveMQBuffer buffer) {
+      message.sendBuffer(buffer.byteBuf(), 0);
+      buffer.writeBoolean(requiresResponse);
 
-      // Position reader for reading by Netty
-      bufferWrite.readerIndex(0);
 
-      return bufferWrite;
    }
 
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       // Buffer comes in after having read standard headers and positioned at Beginning of body part
 
-      message.decodeFromBuffer(buffer);
+      ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
+      message.receiveBuffer(messageBuffer);
 
-      int ri = buffer.readerIndex();
+      buffer.readerIndex(buffer.capacity() - 1);
 
       requiresResponse = buffer.readBoolean();
 
-      buffer.readerIndex(ri);
-
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
index 65aeccb..8560f5d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java
@@ -26,7 +26,7 @@ public class MapMessageUtil extends MessageUtil {
     */
    public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) {
       message.resetWriterIndex();
-      properties.encode(message);
+      properties.encode(message.byteBuf());
    }
 
    /**
@@ -43,7 +43,7 @@ public class MapMessageUtil extends MessageUtil {
     */
    public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) {
       message.resetReaderIndex();
-      map.decode(message);
+      map.decode(message.byteBuf());
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 72795b7..3fddb8e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -23,7 +23,9 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -33,8 +35,6 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -128,9 +128,9 @@ public abstract class SessionContext {
 
    }
 
-   public abstract int getCreditsOnSendingFull(MessageInternal msgI);
+   public abstract int getCreditsOnSendingFull(Message msgI);
 
-   public abstract void sendFullMessage(MessageInternal msgI,
+   public abstract void sendFullMessage(ICoreMessage msgI,
                                         boolean sendBlocking,
                                         SendAcknowledgementHandler handler,
                                         SimpleString defaultAddress) throws ActiveMQException;
@@ -142,9 +142,9 @@ public abstract class SessionContext {
     * @return
     * @throws ActiveMQException
     */
-   public abstract int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException;
+   public abstract int sendInitialChunkOnLargeMessage(Message msgI) throws ActiveMQException;
 
-   public abstract int sendLargeMessageChunk(MessageInternal msgI,
+   public abstract int sendLargeMessageChunk(Message msgI,
                                              long messageBodySize,
                                              boolean sendBlocking,
                                              boolean lastChunk,
@@ -152,7 +152,7 @@ public abstract class SessionContext {
                                              int reconnectID,
                                              SendAcknowledgementHandler messageHandler) throws ActiveMQException;
 
-   public abstract int sendServerLargeMessageChunk(MessageInternal msgI,
+   public abstract int sendServerLargeMessageChunk(Message msgI,
                                                    long messageBodySize,
                                                    boolean sendBlocking,
                                                    boolean lastChunk,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
new file mode 100644
index 0000000..5e92eaf
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.message;
+
+import java.util.LinkedList;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.artemis.reader.TextMessageUtil;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class CoreMessageTest {
+
+   public static final SimpleString ADDRESS = new SimpleString("this.local.address");
+   public static final byte MESSAGE_TYPE = Message.TEXT_TYPE;
+   public static final boolean DURABLE = true;
+   public static final long EXPIRATION = 123L;
+   public static final long TIMESTAMP = 321L;
+   public static final byte PRIORITY = (byte) 3;
+   public static final String TEXT = "hi";
+   public static final String BIGGER_TEXT = "AAAAAAAAAAAAAAAAAAAAAAAAA ASDF ASDF ASF ASD ASF ASDF ASDF ASDF ASF ADSF ASDF";
+   public static final String SMALLER_TEXT = "H";
+   public static final UUID uuid = new UUID(UUID.TYPE_TIME_BASED, new byte[]{0, 0, 0, 0,
+      0, 0, 0, 0,
+      0, 0, 0, 0,
+      0, 0, 0, 1});
+   public static final SimpleString PROP1_NAME = new SimpleString("t1");
+   public static final SimpleString PROP1_VALUE = new SimpleString("value-t1");
+
+   /**
+    * This encode was generated by {@link #generate()}.
+    * Run it manually with a right-click on the IDE to eventually update it
+    * */
+   // body = "hi";
+   private final String STRING_ENCODE = "AAAAFgEAAAAEaABpAAAAAAAAAAAAAQAAACR0AGgAaQBzAC4AbABvAGMAYQBsAC4AYQBkAGQAcgBlAHMAcwAAAwEAAAAAAAAAewAAAAAAAAFBAwEAAAABAAAABHQAMQAKAAAAEHYAYQBsAHUAZQAtAHQAMQA=";
+
+   private ByteBuf BYTE_ENCODE;
+
+
+   @Before
+   public void before() {
+      BYTE_ENCODE = Unpooled.wrappedBuffer(Base64.decode(STRING_ENCODE, Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
+      // some extra caution here, nothing else, to make sure we would get the same encoding back
+      Assert.assertEquals(STRING_ENCODE, encodeString(BYTE_ENCODE.array()));
+      BYTE_ENCODE.readerIndex(0).writerIndex(BYTE_ENCODE.capacity());
+   }
+
+   /** The message is received, then sent to the other side untouched */
+   @Test
+   public void testPassThrough() {
+      CoreMessage decodedMessage = decodeMessage();
+
+      Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(decodedMessage.getReadOnlyBodyBuffer()).toString());
+   }
+
+   /** The message is received, then sent to the other side untouched */
+   @Test
+   public void sendThroughPackets() {
+      CoreMessage decodedMessage = decodeMessage();
+
+      int encodeSize = decodedMessage.getEncodeSize();
+      Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize);
+
+      SessionSendMessage sendMessage = new SessionSendMessage(decodedMessage, true, null);
+      sendMessage.setChannelID(777);
+
+      ActiveMQBuffer buffer = sendMessage.encode(null);
+
+      byte[] byteArray = buffer.byteBuf().array();
+      System.out.println("Sending " + ByteUtil.bytesToHex(buffer.toByteBuffer().array(), 1) + ", bytes = " + byteArray.length);
+
+      buffer.readerIndex(5);
+
+      SessionSendMessage sendMessageReceivedSent = new SessionSendMessage(new CoreMessage());
+
+      sendMessageReceivedSent.decode(buffer);
+
+      Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize());
+
+      Assert.assertTrue(sendMessageReceivedSent.isRequiresResponse());
+
+      Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString());
+   }
+
+   /** The message is received, then sent to the other side untouched */
+   @Test
+   public void sendThroughPacketsClient() {
+      CoreMessage decodedMessage = decodeMessage();
+
+      int encodeSize = decodedMessage.getEncodeSize();
+      Assert.assertEquals(BYTE_ENCODE.capacity(), encodeSize);
+
+      SessionReceiveMessage sendMessage = new SessionReceiveMessage(33, decodedMessage, 7);
+      sendMessage.setChannelID(777);
+
+      ActiveMQBuffer buffer = sendMessage.encode(null);
+
+      buffer.readerIndex(5);
+
+      SessionReceiveMessage sendMessageReceivedSent = new SessionReceiveMessage(new CoreMessage());
+
+      sendMessageReceivedSent.decode(buffer);
+
+      Assert.assertEquals(33, sendMessageReceivedSent.getConsumerID());
+
+      Assert.assertEquals(7, sendMessageReceivedSent.getDeliveryCount());
+
+      Assert.assertEquals(encodeSize, sendMessageReceivedSent.getMessage().getEncodeSize());
+
+      Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(sendMessageReceivedSent.getMessage().getReadOnlyBodyBuffer()).toString());
+   }
+
+   private CoreMessage decodeMessage() {
+
+      ByteBuf newBuffer = Unpooled.buffer(BYTE_ENCODE.capacity());
+      newBuffer.writeBytes(BYTE_ENCODE, 0, BYTE_ENCODE.writerIndex());
+
+      CoreMessage coreMessage = internalDecode(newBuffer);
+
+      int encodeSize = coreMessage.getEncodeSize();
+
+      Assert.assertEquals(newBuffer.capacity(), encodeSize);
+
+      Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString());
+
+      Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME));
+
+      ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length);
+      coreMessage.sendBuffer(destinedBuffer, 0);
+
+      byte[] destinedArray = destinedBuffer.array();
+      byte[] sourceArray = BYTE_ENCODE.array();
+
+      CoreMessage newDecoded = internalDecode(Unpooled.wrappedBuffer(destinedArray));
+
+      Assert.assertEquals(encodeSize, newDecoded.getEncodeSize());
+
+      Assert.assertArrayEquals(sourceArray, destinedArray);
+
+      return coreMessage;
+   }
+
+   private CoreMessage internalDecode(ByteBuf bufferOrigin) {
+      CoreMessage coreMessage = new CoreMessage();
+//      System.out.println("Bytes from test " + ByteUtil.bytesToHex(bufferOrigin.array(), 1));
+      coreMessage.receiveBuffer(bufferOrigin);
+      return coreMessage;
+   }
+
+   /** The message is received, then sent to the other side untouched */
+   @Test
+   public void testChangeBodyStringSameSize() {
+      testChangeBodyString(TEXT.toUpperCase());
+   }
+
+   @Test
+   public void testChangeBodyBiggerString() {
+      testChangeBodyString(BIGGER_TEXT);
+   }
+
+   @Test
+   public void testGenerateEmpty() {
+      CoreMessage empty = new CoreMessage().initBuffer(100);
+      ByteBuf buffer = Unpooled.buffer(200);
+      empty.sendBuffer(buffer, 0);
+
+      CoreMessage empty2 = new CoreMessage();
+      empty2.receiveBuffer(buffer);
+
+      try {
+         empty2.getBodyBuffer().readByte();
+         Assert.fail("should throw exception");
+      } catch (Exception expected) {
+
+      }
+   }
+
+   @Test
+   public void testSaveReceiveLimitedBytes() {
+      CoreMessage empty = new CoreMessage().initBuffer(100);
+      System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex());
+      empty.getBodyBuffer().writeByte((byte)7);
+      System.out.println("R " + empty.getBodyBuffer().readerIndex() + " W " + empty.getBodyBuffer().writerIndex());
+
+      ByteBuf buffer = Unpooled.buffer(200);
+      empty.sendBuffer(buffer, 0);
+
+      CoreMessage empty2 = new CoreMessage();
+      empty2.receiveBuffer(buffer);
+
+      Assert.assertEquals((byte)7, empty2.getBodyBuffer().readByte());
+
+      System.out.println("Readable :: " + empty2.getBodyBuffer().readerIndex() + " writer :" + empty2.getBodyBuffer().writerIndex());
+
+      try {
+         empty2.getBodyBuffer().readByte();
+         Assert.fail("should throw exception");
+      } catch (Exception expected) {
+
+      }
+   }
+
+   @Test
+   public void testChangeBodySmallerString() {
+      testChangeBodyString(SMALLER_TEXT);
+   }
+
+   public void testChangeBodyString(String newString) {
+      CoreMessage coreMessage = decodeMessage();
+
+      coreMessage.putStringProperty("newProperty", "newValue");
+      ActiveMQBuffer legacyBuffer = coreMessage.getBodyBuffer();
+      legacyBuffer.resetWriterIndex();
+      legacyBuffer.clear();
+
+      TextMessageUtil.writeBodyText(legacyBuffer, SimpleString.toSimpleString(newString));
+
+      ByteBuf newbuffer = Unpooled.buffer(150000);
+
+      coreMessage.sendBuffer(newbuffer, 0);
+      newbuffer.readerIndex(0);
+
+      CoreMessage newCoreMessage = new CoreMessage();
+      newCoreMessage.receiveBuffer(newbuffer);
+
+
+      SimpleString newText = TextMessageUtil.readBodyText(newCoreMessage.getReadOnlyBodyBuffer());
+
+      Assert.assertEquals(newString, newText.toString());
+
+//      coreMessage.putStringProperty()
+   }
+
+   @Test
+   public void testPassThroughMultipleThreads() throws Throwable {
+      CoreMessage coreMessage = new CoreMessage();
+      coreMessage.receiveBuffer(BYTE_ENCODE);
+
+      LinkedList<Throwable> errors = new LinkedList<>();
+
+      Thread[] threads = new Thread[50];
+      for (int i = 0; i < threads.length; i++) {
+         threads[i] = new Thread(() -> {
+            try {
+               for (int j = 0; j < 50; j++) {
+                  Assert.assertEquals(ADDRESS, coreMessage.getAddressSimpleString());
+                  Assert.assertEquals(PROP1_VALUE.toString(), coreMessage.getStringProperty(PROP1_NAME));
+
+                  ByteBuf destinedBuffer = Unpooled.buffer(BYTE_ENCODE.array().length);
+                  coreMessage.sendBuffer(destinedBuffer, 0);
+
+                  byte[] destinedArray = destinedBuffer.array();
+                  byte[] sourceArray = BYTE_ENCODE.array();
+
+                  Assert.assertArrayEquals(sourceArray, destinedArray);
+
+                  Assert.assertEquals(TEXT, TextMessageUtil.readBodyText(coreMessage.getReadOnlyBodyBuffer()).toString());
+               }
+            } catch (Throwable e) {
+               e.printStackTrace();
+               errors.add(e);
+            }
+         });
+      }
+
+      for (Thread t : threads) {
+         t.start();
+      }
+
+      for (Thread t : threads) {
+         t.join();
+      }
+
+      for (Throwable e: errors) {
+         throw e;
+      }
+
+   }
+
+   // This is to compare the original encoding with the current version
+   @Test
+   public void compareOriginal() throws Exception {
+      String generated = generate(TEXT);
+
+      Assert.assertEquals(STRING_ENCODE, generated);
+
+      for (int i = 0; i < generated.length(); i++) {
+         Assert.assertEquals("Chart at " + i + " was " + generated.charAt(i) + " instead of " + STRING_ENCODE.charAt(i), generated.charAt(i), STRING_ENCODE.charAt(i));
+      }
+   }
+
+   /** Use this method to update the encode for the known message */
+   @Ignore
+   @Test
+   public void generate() throws Exception {
+
+      printVariable(TEXT, generate(TEXT));
+      printVariable(SMALLER_TEXT, generate(SMALLER_TEXT));
+      printVariable(BIGGER_TEXT, generate(BIGGER_TEXT));
+
+   }
+
+   private void printVariable(String body, String encode) {
+      System.out.println("// body = \"" + body + "\";");
+      System.out.println("private final String STRING_ENCODE = \"" + encode + "\";");
+
+   }
+
+   public String generate(String body) throws Exception {
+
+      ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024);
+      TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
+
+      message.setAddress(ADDRESS);
+      message.setUserID(uuid);
+      message.getProperties().putSimpleStringProperty(PROP1_NAME, PROP1_VALUE);
+
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024);
+      message.sendBuffer(buffer.byteBuf(), 0);
+
+      byte[] bytes = new byte[buffer.byteBuf().writerIndex()];
+      buffer.byteBuf().readBytes(bytes);
+
+      return encodeString(bytes);
+
+      // replace the code
+
+
+   }
+
+   private String encodeString(byte[] bytes) {
+      return Base64.encodeBytes(bytes, 0, bytes.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index eb7cda1..2108be7 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -31,11 +31,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -366,10 +368,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+   public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       appendRecord(r);
    }
@@ -377,12 +379,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    @Override
    public void appendAddRecord(long id,
                                byte recordType,
-                               EncodingSupport record,
+                               Persister persister,
+                               Object record,
                                boolean sync,
                                IOCompletion completionCallback) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
       appendRecord(r);
@@ -398,10 +401,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+   public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       appendRecord(r);
    }
@@ -409,12 +412,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    @Override
    public void appendUpdateRecord(long id,
                                   byte recordType,
-                                  EncodingSupport record,
+                                  Persister persister,
+                                  Object record,
                                   boolean sync,
                                   IOCompletion completionCallback) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
       appendRecord(r);
@@ -448,10 +452,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    public void appendAddRecordTransactional(long txID,
                                             long id,
                                             byte recordType,
-                                            EncodingSupport record) throws Exception {
+                                            Persister persister,
+                                            Object record) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setTxId(txID);
       appendRecord(r);
    }
@@ -469,10 +474,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    public void appendUpdateRecordTransactional(long txID,
                                                long id,
                                                byte recordType,
-                                               EncodingSupport record) throws Exception {
+                                               Persister persister,
+                                               Object record) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setTxId(txID);
       appendRecord(r);
    }
@@ -488,7 +494,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
-      r.setRecord(record);
+      r.setRecord(EncoderPersister.getInstance(), record);
       r.setTxId(txID);
       appendRecord(r);
    }
@@ -685,10 +691,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public void perfBlast(int pages) {
-   }
-
-   @Override
    public void runDirectJournalBlast() throws Exception {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
index 9691d3e..b094164 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
@@ -237,11 +238,11 @@ class JDBCJournalRecord {
       this.record = record;
    }
 
-   public void setRecord(EncodingSupport record) {
-      this.variableSize = record.getEncodeSize();
+   public void setRecord(Persister persister, Object record) {
+      this.variableSize = persister.getEncodeSize(record);
 
       ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
-      record.encode(encodedBuffer);
+      persister.encode(encodedBuffer, record);
       this.record = new ActiveMQBufferInputStream(encodedBuffer);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
index 0e99106..4d0306b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.jms.management;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 
@@ -27,7 +28,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
  */
 public class JMSManagementHelper {
 
-   private static org.apache.activemq.artemis.api.core.Message getCoreMessage(final Message jmsMessage) {
+   private static ClientMessage getCoreMessage(final Message jmsMessage) {
       if (jmsMessage instanceof ActiveMQMessage == false) {
          throw new IllegalArgumentException("Cannot send a foreign message as a management message " + jmsMessage.getClass().getName());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
index 59f04e8..6da3912 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
@@ -26,7 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
 import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@@ -374,7 +374,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
       if (bodyLength == 0)
          return null;
       byte[] dst = new byte[bodyLength];
-      message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst);
+      message.getBodyBuffer().getBytes(CoreMessage.BODY_OFFSET, dst);
       return (T) dst;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 47dcfb2..80a07ac 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -43,7 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.UUID;
@@ -293,7 +293,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public String getJMSMessageID() {
       if (msgID == null) {
-         UUID uid = message.getUserID();
+         UUID uid = (UUID)message.getUserID();
 
          msgID = uid == null ? null : "ID:" + uid.toString();
       }
@@ -397,7 +397,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public Destination getJMSDestination() throws JMSException {
       if (dest == null) {
-         SimpleString address = message.getAddress();
+         SimpleString address = message.getAddressSimpleString();
          String prefix = "";
          if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
             RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
@@ -756,7 +756,7 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    @SuppressWarnings("unchecked")
    protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException {
-      InputStream is = ((MessageInternal) message).getBodyInputStream();
+      InputStream is = ((ClientMessageInternal) message).getBodyInputStream();
       try {
          ObjectInputStream ois = new ObjectInputStream(is);
          return (T) ois.readObject();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
index 6cf20ff..ecb4ccb 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
@@ -19,7 +19,8 @@ package org.apache.activemq.artemis.jms.transaction;
 import javax.transaction.xa.Xid;
 import java.util.Map;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionDetail;
 import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
@@ -36,8 +37,11 @@ public class JMSTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public String decodeMessageType(ServerMessage msg) {
-      int type = msg.getType();
+   public String decodeMessageType(Message msg) {
+      if (!(msg instanceof ICoreMessage)) {
+         return "N/A";
+      }
+      int type = ((ICoreMessage) msg).getType();
       switch (type) {
          case ActiveMQMessage.TYPE: // 0
             return "Default";
@@ -57,7 +61,7 @@ public class JMSTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public Map<String, Object> decodeMessageProperties(ServerMessage msg) {
+   public Map<String, Object> decodeMessageProperties(Message msg) {
       try {
          return ActiveMQMessage.coreMaptoJMSMap(msg.toMap());
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
new file mode 100644
index 0000000..8fc2a5aa
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+/** This is a facade between the new Persister and the former EncodingSupport.
+ *  Methods using the old interface will use this as a facade to provide the previous semantic. */
+public class EncoderPersister implements Persister<EncodingSupport> {
+
+   private static final EncoderPersister theInstance = new EncoderPersister();
+
+   private EncoderPersister() {
+   }
+
+   public static EncoderPersister getInstance() {
+      return theInstance;
+   }
+
+   @Override
+   public int getEncodeSize(EncodingSupport record) {
+      return record.getEncodeSize();
+   }
+
+   @Override
+   public void encode(ActiveMQBuffer buffer, EncodingSupport record) {
+      record.encode(buffer);
+   }
+
+   @Override
+   public EncodingSupport decode(ActiveMQBuffer buffer, EncodingSupport record) {
+      record.decode(buffer);
+      return record;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index fbd4182..ca194b8 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 
 /**
@@ -60,23 +61,49 @@ public interface Journal extends ActiveMQComponent {
 
    void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   default void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+      appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+   }
+
+   void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
 
    void appendAddRecord(long id,
                         byte recordType,
-                        EncodingSupport record,
+                        Persister persister,
+                        Object record,
                         boolean sync,
                         IOCompletion completionCallback) throws Exception;
 
+   default void appendAddRecord(long id,
+                        byte recordType,
+                        EncodingSupport record,
+                        boolean sync,
+                        IOCompletion completionCallback) throws Exception {
+      appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+   }
+
    void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+      appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+   }
 
-   void appendUpdateRecord(long id,
+   void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
+
+   default void appendUpdateRecord(long id,
                            byte recordType,
                            EncodingSupport record,
                            boolean sync,
-                           IOCompletion completionCallback) throws Exception;
+                           IOCompletion completionCallback) throws Exception {
+      appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+   }
+
+   void appendUpdateRecord(final long id,
+                           final byte recordType,
+                           final Persister persister,
+                           final Object record,
+                           final boolean sync,
+                           final IOCompletion callback) throws Exception;
 
    void appendDeleteRecord(long id, boolean sync) throws Exception;
 
@@ -86,11 +113,23 @@ public interface Journal extends ActiveMQComponent {
 
    void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
 
-   void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+   default void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+      appendAddRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+   }
+
+   void appendAddRecordTransactional(final long txID,
+                                     final long id,
+                                     final byte recordType,
+                                     final Persister persister,
+                                     final Object record) throws Exception;
 
    void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
 
-   void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+   default void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+      appendUpdateRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+   }
+
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, Persister persister, Object record) throws Exception;
 
    void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
 
@@ -165,8 +204,6 @@ public interface Journal extends ActiveMQComponent {
 
    int getUserVersion();
 
-   void perfBlast(int pages);
-
    void runDirectJournalBlast() throws Exception;
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 8bbecd2..943077c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -127,7 +128,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
             }
          }
 
-         JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
+         JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
 
          ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 0b702a5..8e5ca2c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -90,10 +91,11 @@ public final class FileWrapperJournal extends JournalBase {
    @Override
    public void appendAddRecord(long id,
                                byte recordType,
-                               EncodingSupport record,
+                               Persister persister,
+                               Object record,
                                boolean sync,
                                IOCompletion callback) throws Exception {
-      JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+      JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
 
       writeRecord(addRecord, sync, callback);
    }
@@ -144,19 +146,21 @@ public final class FileWrapperJournal extends JournalBase {
    public void appendAddRecordTransactional(long txID,
                                             long id,
                                             byte recordType,
-                                            EncodingSupport record) throws Exception {
+                                            Persister persister,
+                                            Object record) throws Exception {
       count(txID);
-      JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+      JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
       writeRecord(addRecord, false, null);
    }
 
    @Override
    public void appendUpdateRecord(long id,
                                   byte recordType,
-                                  EncodingSupport record,
+                                  Persister persister,
+                                  Object record,
                                   boolean sync,
                                   IOCompletion callback) throws Exception {
-      JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+      JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
       writeRecord(updateRecord, sync, callback);
    }
 
@@ -164,9 +168,10 @@ public final class FileWrapperJournal extends JournalBase {
    public void appendUpdateRecordTransactional(long txID,
                                                long id,
                                                byte recordType,
-                                               EncodingSupport record) throws Exception {
+                                               Persister persister,
+                                               Object record) throws Exception {
       count(txID);
-      JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+      JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record);
       writeRecord(updateRecordTX, false, null);
    }
 
@@ -261,11 +266,6 @@ public final class FileWrapperJournal extends JournalBase {
    }
 
    @Override
-   public void perfBlast(int pages) {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
    public void runDirectJournalBlast() throws Exception {
       throw new UnsupportedOperationException();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index e2ca84d..e6bd99e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 
 abstract class JournalBase implements Journal {
@@ -37,68 +38,15 @@ abstract class JournalBase implements Journal {
    }
 
    @Override
-   public abstract void appendAddRecord(final long id,
-                                        final byte recordType,
-                                        final EncodingSupport record,
-                                        final boolean sync,
-                                        final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendAddRecordTransactional(final long txID,
-                                                     final long id,
-                                                     final byte recordType,
-                                                     final EncodingSupport record) throws Exception;
-
-   @Override
-   public abstract void appendCommitRecord(final long txID,
-                                           final boolean sync,
-                                           final IOCompletion callback,
-                                           boolean lineUpContext) throws Exception;
-
-   @Override
-   public abstract void appendDeleteRecord(final long id,
-                                           final boolean sync,
-                                           final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendDeleteRecordTransactional(final long txID,
-                                                        final long id,
-                                                        final EncodingSupport record) throws Exception;
-
-   @Override
-   public abstract void appendPrepareRecord(final long txID,
-                                            final EncodingSupport transactionData,
-                                            final boolean sync,
-                                            final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendUpdateRecord(final long id,
-                                           final byte recordType,
-                                           final EncodingSupport record,
-                                           final boolean sync,
-                                           final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendUpdateRecordTransactional(final long txID,
-                                                        final long id,
-                                                        final byte recordType,
-                                                        final EncodingSupport record) throws Exception;
-
-   @Override
-   public abstract void appendRollbackRecord(final long txID,
-                                             final boolean sync,
-                                             final IOCompletion callback) throws Exception;
-
-   @Override
    public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
       appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
    @Override
-   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+   public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
       SyncIOCompletion callback = getSyncCallback(sync);
 
-      appendAddRecord(id, recordType, record, sync, callback);
+      appendAddRecord(id, recordType, persister, record, sync, callback);
 
       if (callback != null) {
          callback.waitCompletion();
@@ -176,11 +124,12 @@ abstract class JournalBase implements Journal {
    @Override
    public void appendUpdateRecord(final long id,
                                   final byte recordType,
-                                  final EncodingSupport record,
+                                  final Persister persister,
+                                  final Object record,
                                   final boolean sync) throws Exception {
       SyncIOCompletion callback = getSyncCallback(sync);
 
-      appendUpdateRecord(id, recordType, record, sync, callback);
+      appendUpdateRecord(id, recordType, persister, record, sync, callback);
 
       if (callback != null) {
          callback.waitCompletion();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index b95d641..c62b27b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
@@ -252,7 +253,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    @Override
    public void onReadAddRecord(final RecordInfo info) throws Exception {
       if (lookupRecord(info.id)) {
-         JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+         JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
          addRecord.setCompactCount((short) (info.compactCount + 1));
 
          checkSize(addRecord.getEncodeSize(), info.compactCount);
@@ -268,7 +269,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+         JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data));
 
          record.setCompactCount((short) (info.compactCount + 1));
 
@@ -374,7 +375,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    @Override
    public void onReadUpdateRecord(final RecordInfo info) throws Exception {
       if (lookupRecord(info.id)) {
-         JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+         JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
 
          updateRecord.setCompactCount((short) (info.compactCount + 1));
 
@@ -397,7 +398,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+         JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
 
          updateRecordTX.setCompactCount((short) (info.compactCount + 1));
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index db615f8..24bb916 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -57,11 +57,11 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TestableJournal;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalCompleteRecordTX;
@@ -713,7 +713,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    @Override
    public void appendAddRecord(final long id,
                                final byte recordType,
-                               final EncodingSupport record,
+                               final Persister persister,
+                               final Object record,
                                final boolean sync,
                                final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
@@ -727,7 +728,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          public void run() {
             journalLock.readLock().lock();
             try {
-               JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+               JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
                records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
 
@@ -762,7 +763,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    @Override
    public void appendUpdateRecord(final long id,
                                   final byte recordType,
-                                  final EncodingSupport record,
+                                  final Persister persister,
+                                  final Object record,
                                   final boolean sync,
                                   final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
@@ -777,7 +779,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             journalLock.readLock().lock();
             try {
                JournalRecord jrnRecord = records.get(id);
-               JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+               JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
 
                if (logger.isTraceEnabled()) {
@@ -873,7 +875,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void appendAddRecordTransactional(final long txID,
                                             final long id,
                                             final byte recordType,
-                                            final EncodingSupport record) throws Exception {
+                                            final Persister persister,
+                                            final Object record) throws Exception {
       checkJournalIsLoaded();
 
       final JournalTransaction tx = getTransactionInfo(txID);
@@ -885,7 +888,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          public void run() {
             journalLock.readLock().lock();
             try {
-               JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+               JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
 
                if (logger.isTraceEnabled()) {
@@ -952,7 +955,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
-                                               final EncodingSupport record) throws Exception {
+                                               final Persister persister,
+                                               final Object record) throws Exception {
       checkJournalIsLoaded();
 
       final JournalTransaction tx = getTransactionInfo(txID);
@@ -965,7 +969,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             journalLock.readLock().lock();
             try {
 
-               JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
+               JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record );
                JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
 
                if ( logger.isTraceEnabled() ) {
@@ -2165,45 +2169,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
    }
 
-   @Override
-   public void perfBlast(final int pages) {
-
-      checkJournalIsLoaded();
-
-      final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
-
-      final JournalInternalRecord blastRecord = new JournalInternalRecord() {
-
-         @Override
-         public int getEncodeSize() {
-            return byteEncoder.getEncodeSize();
-         }
-
-         @Override
-         public void encode(final ActiveMQBuffer buffer) {
-            byteEncoder.encode(buffer);
-         }
-      };
-
-      appendExecutor.execute(new Runnable() {
-         @Override
-         public void run() {
-            journalLock.readLock().lock();
-            try {
-
-               for (int i = 0; i < pages; i++) {
-                  appendRecord(blastRecord, false, false, null, null);
-               }
-
-            } catch (Exception e) {
-               ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
-            } finally {
-               journalLock.readLock().unlock();
-            }
-         }
-      });
-   }
-
    // ActiveMQComponent implementation
    // ---------------------------------------------------
 
@@ -2921,5 +2886,4 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public int getCompactCount() {
       return compactCount;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
index c6a5d4a..6e5b651 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
@@ -17,14 +17,16 @@
 package org.apache.activemq.artemis.core.journal.impl.dataformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 
 public class JournalAddRecord extends JournalInternalRecord {
 
    protected final long id;
 
-   protected final EncodingSupport record;
+   protected final Persister persister;
+
+   protected final Object record;
 
    protected final byte recordType;
 
@@ -35,7 +37,7 @@ public class JournalAddRecord extends JournalInternalRecord {
     * @param recordType
     * @param record
     */
-   public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record) {
+   public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) {
       this.id = id;
 
       this.record = record;
@@ -43,6 +45,8 @@ public class JournalAddRecord extends JournalInternalRecord {
       this.recordType = recordType;
 
       this.add = add;
+
+      this.persister = persister;
    }
 
    @Override
@@ -59,17 +63,19 @@ public class JournalAddRecord extends JournalInternalRecord {
 
       buffer.writeLong(id);
 
-      buffer.writeInt(record.getEncodeSize());
+      int recordEncodeSize = persister.getEncodeSize(record);
+
+      buffer.writeInt(persister.getEncodeSize(record));
 
       buffer.writeByte(recordType);
 
-      record.encode(buffer);
+      persister.encode(buffer, record);
 
-      buffer.writeInt(getEncodeSize());
+      buffer.writeInt(recordEncodeSize + JournalImpl.SIZE_ADD_RECORD + 1);
    }
 
    @Override
    public int getEncodeSize() {
-      return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
+      return JournalImpl.SIZE_ADD_RECORD + persister.getEncodeSize(record) + 1;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
index 6cec122..483418f 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
@@ -17,7 +17,7 @@
 package org.apache.activemq.artemis.core.journal.impl.dataformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 
 public class JournalAddRecordTX extends JournalInternalRecord {
@@ -26,7 +26,9 @@ public class JournalAddRecordTX extends JournalInternalRecord {
 
    private final long id;
 
-   private final EncodingSupport record;
+   protected final Persister persister;
+
+   protected final Object record;
 
    private final byte recordType;
 
@@ -41,12 +43,15 @@ public class JournalAddRecordTX extends JournalInternalRecord {
                              final long txID,
                              final long id,
                              final byte recordType,
-                             final EncodingSupport record) {
+                             final Persister persister,
+                             Object record) {
 
       this.txID = txID;
 
       this.id = id;
 
+      this.persister = persister;
+
       this.record = record;
 
       this.recordType = recordType;
@@ -70,17 +75,17 @@ public class JournalAddRecordTX extends JournalInternalRecord {
 
       buffer.writeLong(id);
 
-      buffer.writeInt(record.getEncodeSize());
+      buffer.writeInt(persister.getEncodeSize(record));
 
       buffer.writeByte(recordType);
 
-      record.encode(buffer);
+      persister.encode(buffer, record);
 
       buffer.writeInt(getEncodeSize());
    }
 
    @Override
    public int getEncodeSize() {
-      return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
+      return JournalImpl.SIZE_ADD_RECORD_TX + persister.getEncodeSize(record) + 1;
    }
 }


Mime
View raw message