activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [14/23] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Wed, 01 Mar 2017 17:21:08 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index eb7cda1..2108be7 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -31,11 +31,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -366,10 +368,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+   public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       appendRecord(r);
    }
@@ -377,12 +379,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    @Override
    public void appendAddRecord(long id,
                                byte recordType,
-                               EncodingSupport record,
+                               Persister persister,
+                               Object record,
                                boolean sync,
                                IOCompletion completionCallback) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
       appendRecord(r);
@@ -398,10 +401,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+   public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       appendRecord(r);
    }
@@ -409,12 +412,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    @Override
    public void appendUpdateRecord(long id,
                                   byte recordType,
-                                  EncodingSupport record,
+                                  Persister persister,
+                                  Object record,
                                   boolean sync,
                                   IOCompletion completionCallback) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
       appendRecord(r);
@@ -448,10 +452,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    public void appendAddRecordTransactional(long txID,
                                             long id,
                                             byte recordType,
-                                            EncodingSupport record) throws Exception {
+                                            Persister persister,
+                                            Object record) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setTxId(txID);
       appendRecord(r);
    }
@@ -469,10 +474,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    public void appendUpdateRecordTransactional(long txID,
                                                long id,
                                                byte recordType,
-                                               EncodingSupport record) throws Exception {
+                                               Persister persister,
+                                               Object record) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
-      r.setRecord(record);
+      r.setRecord(persister, record);
       r.setTxId(txID);
       appendRecord(r);
    }
@@ -488,7 +494,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
-      r.setRecord(record);
+      r.setRecord(EncoderPersister.getInstance(), record);
       r.setTxId(txID);
       appendRecord(r);
    }
@@ -685,10 +691,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public void perfBlast(int pages) {
-   }
-
-   @Override
    public void runDirectJournalBlast() throws Exception {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
index 9691d3e..b094164 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
@@ -237,11 +238,11 @@ class JDBCJournalRecord {
       this.record = record;
    }
 
-   public void setRecord(EncodingSupport record) {
-      this.variableSize = record.getEncodeSize();
+   public void setRecord(Persister persister, Object record) {
+      this.variableSize = persister.getEncodeSize(record);
 
       ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
-      record.encode(encodedBuffer);
+      persister.encode(encodedBuffer, record);
       this.record = new ActiveMQBufferInputStream(encodedBuffer);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
index 59f04e8..6da3912 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java
@@ -26,7 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
 import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@@ -374,7 +374,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
       if (bodyLength == 0)
          return null;
       byte[] dst = new byte[bodyLength];
-      message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst);
+      message.getBodyBuffer().getBytes(CoreMessage.BODY_OFFSET, dst);
       return (T) dst;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 47dcfb2..80a07ac 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -43,7 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.UUID;
@@ -293,7 +293,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public String getJMSMessageID() {
       if (msgID == null) {
-         UUID uid = message.getUserID();
+         UUID uid = (UUID)message.getUserID();
 
          msgID = uid == null ? null : "ID:" + uid.toString();
       }
@@ -397,7 +397,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public Destination getJMSDestination() throws JMSException {
       if (dest == null) {
-         SimpleString address = message.getAddress();
+         SimpleString address = message.getAddressSimpleString();
          String prefix = "";
          if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
             RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
@@ -756,7 +756,7 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    @SuppressWarnings("unchecked")
    protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException {
-      InputStream is = ((MessageInternal) message).getBodyInputStream();
+      InputStream is = ((ClientMessageInternal) message).getBodyInputStream();
       try {
          ObjectInputStream ois = new ObjectInputStream(is);
          return (T) ois.readObject();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
index 6cf20ff..289f88c 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jms.transaction;
 import javax.transaction.xa.Xid;
 import java.util.Map;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionDetail;
 import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
@@ -36,7 +36,7 @@ public class JMSTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public String decodeMessageType(ServerMessage msg) {
+   public String decodeMessageType(Message msg) {
       int type = msg.getType();
       switch (type) {
          case ActiveMQMessage.TYPE: // 0
@@ -57,7 +57,7 @@ public class JMSTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public Map<String, Object> decodeMessageProperties(ServerMessage msg) {
+   public Map<String, Object> decodeMessageProperties(Message msg) {
       try {
          return ActiveMQMessage.coreMaptoJMSMap(msg.toMap());
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
new file mode 100644
index 0000000..8fc2a5aa
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+/** This is a facade between the new Persister and the former EncodingSupport.
+ *  Methods using the old interface will use this as a facade to provide the previous semantic. */
+public class EncoderPersister implements Persister<EncodingSupport> {
+
+   private static final EncoderPersister theInstance = new EncoderPersister();
+
+   private EncoderPersister() {
+   }
+
+   public static EncoderPersister getInstance() {
+      return theInstance;
+   }
+
+   @Override
+   public int getEncodeSize(EncodingSupport record) {
+      return record.getEncodeSize();
+   }
+
+   @Override
+   public void encode(ActiveMQBuffer buffer, EncodingSupport record) {
+      record.encode(buffer);
+   }
+
+   @Override
+   public EncodingSupport decode(ActiveMQBuffer buffer, EncodingSupport record) {
+      record.decode(buffer);
+      return record;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index fbd4182..ca194b8 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 
 /**
@@ -60,23 +61,49 @@ public interface Journal extends ActiveMQComponent {
 
    void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   default void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+      appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+   }
+
+   void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
 
    void appendAddRecord(long id,
                         byte recordType,
-                        EncodingSupport record,
+                        Persister persister,
+                        Object record,
                         boolean sync,
                         IOCompletion completionCallback) throws Exception;
 
+   default void appendAddRecord(long id,
+                        byte recordType,
+                        EncodingSupport record,
+                        boolean sync,
+                        IOCompletion completionCallback) throws Exception {
+      appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+   }
+
    void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+      appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+   }
 
-   void appendUpdateRecord(long id,
+   void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
+
+   default void appendUpdateRecord(long id,
                            byte recordType,
                            EncodingSupport record,
                            boolean sync,
-                           IOCompletion completionCallback) throws Exception;
+                           IOCompletion completionCallback) throws Exception {
+      appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+   }
+
+   void appendUpdateRecord(final long id,
+                           final byte recordType,
+                           final Persister persister,
+                           final Object record,
+                           final boolean sync,
+                           final IOCompletion callback) throws Exception;
 
    void appendDeleteRecord(long id, boolean sync) throws Exception;
 
@@ -86,11 +113,23 @@ public interface Journal extends ActiveMQComponent {
 
    void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
 
-   void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+   default void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+      appendAddRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+   }
+
+   void appendAddRecordTransactional(final long txID,
+                                     final long id,
+                                     final byte recordType,
+                                     final Persister persister,
+                                     final Object record) throws Exception;
 
    void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
 
-   void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+   default void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception {
+      appendUpdateRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
+   }
+
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, Persister persister, Object record) throws Exception;
 
    void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
 
@@ -165,8 +204,6 @@ public interface Journal extends ActiveMQComponent {
 
    int getUserVersion();
 
-   void perfBlast(int pages);
-
    void runDirectJournalBlast() throws Exception;
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 8bbecd2..943077c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -127,7 +128,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
             }
          }
 
-         JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
+         JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
 
          ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 0b702a5..8e5ca2c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@@ -90,10 +91,11 @@ public final class FileWrapperJournal extends JournalBase {
    @Override
    public void appendAddRecord(long id,
                                byte recordType,
-                               EncodingSupport record,
+                               Persister persister,
+                               Object record,
                                boolean sync,
                                IOCompletion callback) throws Exception {
-      JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+      JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
 
       writeRecord(addRecord, sync, callback);
    }
@@ -144,19 +146,21 @@ public final class FileWrapperJournal extends JournalBase {
    public void appendAddRecordTransactional(long txID,
                                             long id,
                                             byte recordType,
-                                            EncodingSupport record) throws Exception {
+                                            Persister persister,
+                                            Object record) throws Exception {
       count(txID);
-      JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+      JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
       writeRecord(addRecord, false, null);
    }
 
    @Override
    public void appendUpdateRecord(long id,
                                   byte recordType,
-                                  EncodingSupport record,
+                                  Persister persister,
+                                  Object record,
                                   boolean sync,
                                   IOCompletion callback) throws Exception {
-      JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+      JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
       writeRecord(updateRecord, sync, callback);
    }
 
@@ -164,9 +168,10 @@ public final class FileWrapperJournal extends JournalBase {
    public void appendUpdateRecordTransactional(long txID,
                                                long id,
                                                byte recordType,
-                                               EncodingSupport record) throws Exception {
+                                               Persister persister,
+                                               Object record) throws Exception {
       count(txID);
-      JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+      JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record);
       writeRecord(updateRecordTX, false, null);
    }
 
@@ -261,11 +266,6 @@ public final class FileWrapperJournal extends JournalBase {
    }
 
    @Override
-   public void perfBlast(int pages) {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
    public void runDirectJournalBlast() throws Exception {
       throw new UnsupportedOperationException();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index e2ca84d..e6bd99e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 
 abstract class JournalBase implements Journal {
@@ -37,68 +38,15 @@ abstract class JournalBase implements Journal {
    }
 
    @Override
-   public abstract void appendAddRecord(final long id,
-                                        final byte recordType,
-                                        final EncodingSupport record,
-                                        final boolean sync,
-                                        final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendAddRecordTransactional(final long txID,
-                                                     final long id,
-                                                     final byte recordType,
-                                                     final EncodingSupport record) throws Exception;
-
-   @Override
-   public abstract void appendCommitRecord(final long txID,
-                                           final boolean sync,
-                                           final IOCompletion callback,
-                                           boolean lineUpContext) throws Exception;
-
-   @Override
-   public abstract void appendDeleteRecord(final long id,
-                                           final boolean sync,
-                                           final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendDeleteRecordTransactional(final long txID,
-                                                        final long id,
-                                                        final EncodingSupport record) throws Exception;
-
-   @Override
-   public abstract void appendPrepareRecord(final long txID,
-                                            final EncodingSupport transactionData,
-                                            final boolean sync,
-                                            final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendUpdateRecord(final long id,
-                                           final byte recordType,
-                                           final EncodingSupport record,
-                                           final boolean sync,
-                                           final IOCompletion callback) throws Exception;
-
-   @Override
-   public abstract void appendUpdateRecordTransactional(final long txID,
-                                                        final long id,
-                                                        final byte recordType,
-                                                        final EncodingSupport record) throws Exception;
-
-   @Override
-   public abstract void appendRollbackRecord(final long txID,
-                                             final boolean sync,
-                                             final IOCompletion callback) throws Exception;
-
-   @Override
    public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
       appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
    @Override
-   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
+   public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
       SyncIOCompletion callback = getSyncCallback(sync);
 
-      appendAddRecord(id, recordType, record, sync, callback);
+      appendAddRecord(id, recordType, persister, record, sync, callback);
 
       if (callback != null) {
          callback.waitCompletion();
@@ -176,11 +124,12 @@ abstract class JournalBase implements Journal {
    @Override
    public void appendUpdateRecord(final long id,
                                   final byte recordType,
-                                  final EncodingSupport record,
+                                  final Persister persister,
+                                  final Object record,
                                   final boolean sync) throws Exception {
       SyncIOCompletion callback = getSyncCallback(sync);
 
-      appendUpdateRecord(id, recordType, record, sync, callback);
+      appendUpdateRecord(id, recordType, persister, record, sync, callback);
 
       if (callback != null) {
          callback.waitCompletion();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index b95d641..c62b27b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
@@ -252,7 +253,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    @Override
    public void onReadAddRecord(final RecordInfo info) throws Exception {
       if (lookupRecord(info.id)) {
-         JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+         JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
          addRecord.setCompactCount((short) (info.compactCount + 1));
 
          checkSize(addRecord.getEncodeSize(), info.compactCount);
@@ -268,7 +269,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
+         JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data));
 
          record.setCompactCount((short) (info.compactCount + 1));
 
@@ -374,7 +375,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    @Override
    public void onReadUpdateRecord(final RecordInfo info) throws Exception {
       if (lookupRecord(info.id)) {
-         JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+         JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
 
          updateRecord.setCompactCount((short) (info.compactCount + 1));
 
@@ -397,7 +398,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-         JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
+         JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
 
          updateRecordTX.setCompactCount((short) (info.compactCount + 1));
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index db615f8..24bb916 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -57,11 +57,11 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TestableJournal;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalCompleteRecordTX;
@@ -713,7 +713,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    @Override
    public void appendAddRecord(final long id,
                                final byte recordType,
-                               final EncodingSupport record,
+                               final Persister persister,
+                               final Object record,
                                final boolean sync,
                                final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
@@ -727,7 +728,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          public void run() {
             journalLock.readLock().lock();
             try {
-               JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+               JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
                records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
 
@@ -762,7 +763,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    @Override
    public void appendUpdateRecord(final long id,
                                   final byte recordType,
-                                  final EncodingSupport record,
+                                  final Persister persister,
+                                  final Object record,
                                   final boolean sync,
                                   final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
@@ -777,7 +779,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             journalLock.readLock().lock();
             try {
                JournalRecord jrnRecord = records.get(id);
-               JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+               JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
 
                if (logger.isTraceEnabled()) {
@@ -873,7 +875,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void appendAddRecordTransactional(final long txID,
                                             final long id,
                                             final byte recordType,
-                                            final EncodingSupport record) throws Exception {
+                                            final Persister persister,
+                                            final Object record) throws Exception {
       checkJournalIsLoaded();
 
       final JournalTransaction tx = getTransactionInfo(txID);
@@ -885,7 +888,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          public void run() {
             journalLock.readLock().lock();
             try {
-               JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+               JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
 
                if (logger.isTraceEnabled()) {
@@ -952,7 +955,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
-                                               final EncodingSupport record) throws Exception {
+                                               final Persister persister,
+                                               final Object record) throws Exception {
       checkJournalIsLoaded();
 
       final JournalTransaction tx = getTransactionInfo(txID);
@@ -965,7 +969,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             journalLock.readLock().lock();
             try {
 
-               JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
+               JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record );
                JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
 
                if ( logger.isTraceEnabled() ) {
@@ -2165,45 +2169,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
    }
 
-   @Override
-   public void perfBlast(final int pages) {
-
-      checkJournalIsLoaded();
-
-      final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
-
-      final JournalInternalRecord blastRecord = new JournalInternalRecord() {
-
-         @Override
-         public int getEncodeSize() {
-            return byteEncoder.getEncodeSize();
-         }
-
-         @Override
-         public void encode(final ActiveMQBuffer buffer) {
-            byteEncoder.encode(buffer);
-         }
-      };
-
-      appendExecutor.execute(new Runnable() {
-         @Override
-         public void run() {
-            journalLock.readLock().lock();
-            try {
-
-               for (int i = 0; i < pages; i++) {
-                  appendRecord(blastRecord, false, false, null, null);
-               }
-
-            } catch (Exception e) {
-               ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
-            } finally {
-               journalLock.readLock().unlock();
-            }
-         }
-      });
-   }
-
    // ActiveMQComponent implementation
    // ---------------------------------------------------
 
@@ -2921,5 +2886,4 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public int getCompactCount() {
       return compactCount;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
index c6a5d4a..6e5b651 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
@@ -17,14 +17,16 @@
 package org.apache.activemq.artemis.core.journal.impl.dataformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 
 public class JournalAddRecord extends JournalInternalRecord {
 
    protected final long id;
 
-   protected final EncodingSupport record;
+   protected final Persister persister;
+
+   protected final Object record;
 
    protected final byte recordType;
 
@@ -35,7 +37,7 @@ public class JournalAddRecord extends JournalInternalRecord {
     * @param recordType
     * @param record
     */
-   public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record) {
+   public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) {
       this.id = id;
 
       this.record = record;
@@ -43,6 +45,8 @@ public class JournalAddRecord extends JournalInternalRecord {
       this.recordType = recordType;
 
       this.add = add;
+
+      this.persister = persister;
    }
 
    @Override
@@ -59,17 +63,19 @@ public class JournalAddRecord extends JournalInternalRecord {
 
       buffer.writeLong(id);
 
-      buffer.writeInt(record.getEncodeSize());
+      int recordEncodeSize = persister.getEncodeSize(record);
+
+      buffer.writeInt(persister.getEncodeSize(record));
 
       buffer.writeByte(recordType);
 
-      record.encode(buffer);
+      persister.encode(buffer, record);
 
-      buffer.writeInt(getEncodeSize());
+      buffer.writeInt(recordEncodeSize + JournalImpl.SIZE_ADD_RECORD + 1);
    }
 
    @Override
    public int getEncodeSize() {
-      return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
+      return JournalImpl.SIZE_ADD_RECORD + persister.getEncodeSize(record) + 1;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
index 6cec122..483418f 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java
@@ -17,7 +17,7 @@
 package org.apache.activemq.artemis.core.journal.impl.dataformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 
 public class JournalAddRecordTX extends JournalInternalRecord {
@@ -26,7 +26,9 @@ public class JournalAddRecordTX extends JournalInternalRecord {
 
    private final long id;
 
-   private final EncodingSupport record;
+   protected final Persister persister;
+
+   protected final Object record;
 
    private final byte recordType;
 
@@ -41,12 +43,15 @@ public class JournalAddRecordTX extends JournalInternalRecord {
                              final long txID,
                              final long id,
                              final byte recordType,
-                             final EncodingSupport record) {
+                             final Persister persister,
+                             Object record) {
 
       this.txID = txID;
 
       this.id = id;
 
+      this.persister = persister;
+
       this.record = record;
 
       this.recordType = recordType;
@@ -70,17 +75,17 @@ public class JournalAddRecordTX extends JournalInternalRecord {
 
       buffer.writeLong(id);
 
-      buffer.writeInt(record.getEncodeSize());
+      buffer.writeInt(persister.getEncodeSize(record));
 
       buffer.writeByte(recordType);
 
-      record.encode(buffer);
+      persister.encode(buffer, record);
 
       buffer.writeInt(getEncodeSize());
    }
 
    @Override
    public int getEncodeSize() {
-      return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
+      return JournalImpl.SIZE_ADD_RECORD_TX + persister.getEncodeSize(record) + 1;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
new file mode 100644
index 0000000..ee2f870
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.encode.BodyType;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.apache.qpid.proton.util.TLSEncoder;
+
+// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
+public class AMQPMessage extends RefCountMessage {
+
+   final long messageFormat;
+   private ProtonProtocolManager protocolManager;
+   ByteBuf data;
+   boolean bufferValid;
+   byte type;
+   long messageID;
+   String address;
+   MessageImpl protonMessage;
+   private long expiration = 0;
+   // this can be used to encode the header again and the rest of the message buffer
+   private int headerEnd = -1;
+   private Header _header;
+   private DeliveryAnnotations _deliveryAnnotations;
+   private MessageAnnotations _messageAnnotations;
+   private Properties _properties;
+   private ApplicationProperties applicationProperties;
+
+   public AMQPMessage(long messageFormat, byte[] data, ProtonProtocolManager protocolManager) {
+      this.protocolManager = protocolManager;
+      this.data = Unpooled.wrappedBuffer(data);
+      this.messageFormat = messageFormat;
+      this.bufferValid = true;
+
+   }
+
+   /** for persistence reload */
+   public AMQPMessage(long messageFormat) {
+      this.messageFormat = messageFormat;
+      this.bufferValid = false;
+
+   }
+
+   public AMQPMessage(long messageFormat, Message message, ProtonProtocolManager protocolManager) {
+      this.protocolManager = protocolManager;
+      this.protonMessage = (MessageImpl)message;
+      this.messageFormat = messageFormat;
+
+   }
+
+   public AMQPMessage(Message message, ProtonProtocolManager protocolManager) {
+      this(0, message, protocolManager);
+   }
+
+   public MessageImpl getProtonMessage() {
+      if (protonMessage == null) {
+         protonMessage = (MessageImpl) Message.Factory.create();
+
+         if (data != null) {
+            data.readerIndex(0);
+            protonMessage.decode(data.nioBuffer());
+            this._header = protonMessage.getHeader();
+            protonMessage.setHeader(null);
+         }
+      }
+
+      return protonMessage;
+   }
+
+   private void initalizeObjects() {
+      if (protonMessage == null) {
+         if (data == null) {
+            this.headerEnd = -1;
+            _header = new Header();
+            _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+            _properties = new Properties();
+            this.applicationProperties = new ApplicationProperties(new HashMap<>());
+            this.protonMessage = (MessageImpl)Message.Factory.create();
+            this.protonMessage.setApplicationProperties(applicationProperties);
+            this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
+         }
+      }
+   }
+
+   private ApplicationProperties getApplicationProperties() {
+      if (applicationProperties == null) {
+         if (data != null) {
+            partialDecode(data.nioBuffer(), true);
+         } else {
+            initalizeObjects();
+         }
+      }
+
+      return applicationProperties;
+   }
+
+   public Header getHeader() {
+      if (_header == null) {
+         if (data == null) {
+            initalizeObjects();
+         } else {
+            partialDecode(this.data.nioBuffer(), false);
+         }
+      }
+
+      return _header;
+   }
+
+   public Properties getProperties() {
+      if (_properties == null) {
+         if (data == null) {
+            initalizeObjects();
+         } else {
+            partialDecode(this.data.nioBuffer(), true);
+         }
+      }
+
+      return _properties;
+   }
+
+   @Override
+   public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
+      return AMQPMessagePersister.getInstance();
+   }
+
+   private synchronized void partialDecode(ByteBuffer buffer, boolean readApplicationProperties) {
+      DecoderImpl decoder = TLSEncoder.getDecoder();
+      decoder.setByteBuffer(buffer);
+      buffer.position(0);
+
+      _header = null;
+      _deliveryAnnotations = null;
+      _messageAnnotations = null;
+      _properties = null;
+      applicationProperties = null;
+      Section section = null;
+
+      try {
+         if (buffer.hasRemaining()) {
+            section = (Section) decoder.readObject();
+         }
+
+         if (section instanceof Header) {
+            headerEnd = buffer.position();
+            _header = (Header) section;
+
+            if (!readApplicationProperties) {
+               return;
+            }
+
+            if (buffer.hasRemaining() && readApplicationProperties) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+         }
+
+         if (!readApplicationProperties) {
+            return;
+         }
+         if (section instanceof DeliveryAnnotations) {
+            _deliveryAnnotations = (DeliveryAnnotations) section;
+
+            if (buffer.hasRemaining()) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+
+         }
+         if (section instanceof MessageAnnotations) {
+            _messageAnnotations = (MessageAnnotations) section;
+
+            if (buffer.hasRemaining()) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+
+         }
+         if (section instanceof Properties) {
+            _properties = (Properties) section;
+
+            if (_header.getTtl() != null) {
+               this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
+            }
+
+            if (buffer.hasRemaining()) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+
+         }
+         if (section instanceof ApplicationProperties) {
+            applicationProperties = (ApplicationProperties) section;
+         }
+      } finally {
+         decoder.setByteBuffer(null);
+      }
+   }
+
+   public long getMessageFormat() {
+      return messageFormat;
+   }
+
+   public int getLength() {
+      return data.array().length;
+   }
+
+   public byte[] getArray() {
+      return data.array();
+   }
+
+   @Override
+   public void messageChanged() {
+      bufferValid = false;
+      this.data = null;
+   }
+
+   // TODO-now this only make sense on Core
+   @Override
+   public ActiveMQBuffer getBodyBuffer() {
+      return null;
+   }
+
+   // TODO-now this only make sense on Core
+   @Override
+   public ActiveMQBuffer getReadOnlyBodyBuffer() {
+      return null;
+   }
+
+   // TODO: Refactor Large message
+   @Override
+   public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
+      return null;
+   }
+
+   @Override
+   public byte getType() {
+      // TODO-now: what to do here?
+      return type;
+   }
+
+   @Override
+   public AMQPMessage setType(byte type) {
+      this.type = type;
+      return this;
+   }
+
+   @Override
+   public boolean isLargeMessage() {
+      return false;
+   }
+
+   @Override
+   public ByteBuf getBuffer() {
+      if (data == null) {
+         return null;
+      } else {
+         return Unpooled.wrappedBuffer(data);
+      }
+   }
+
+   @Override
+   public AMQPMessage setBuffer(ByteBuf buffer) {
+      this.data = null;
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message copy() {
+      // TODO-now: what to do with this?
+      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager);
+      return newEncode;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message copy(long newID) {
+      return copy().setMessageID(newID);
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageID;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setMessageID(long id) {
+      this.messageID = id;
+      return this;
+   }
+
+   @Override
+   public long getExpiration() {
+      return expiration;
+   }
+
+   @Override
+   public AMQPMessage setExpiration(long expiration) {
+      this.expiration = expiration;
+      return this;
+   }
+
+   @Override
+   public Object getUserID() {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) {
+      return null;
+   }
+
+   @Override
+   public void copyHeadersAndProperties(org.apache.activemq.artemis.api.core.Message msg) {
+
+   }
+
+   @Override
+   public boolean isDurable() {
+      if (getHeader() != null) {
+         return getHeader().getDurable().booleanValue();
+      } else {
+         return false;
+      }
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
+      return null;
+   }
+
+   @Override
+   public Object getProtocol() {
+      return protocolManager;
+   }
+
+   @Override
+   public AMQPMessage setProtocol(Object protocol) {
+      this.protocolManager = (ProtonProtocolManager)protocol;
+      return this;
+   }
+
+   @Override
+   public Object getBody() {
+      return null;
+   }
+
+   @Override
+   public BodyType getBodyType() {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setBody(BodyType type, Object body) {
+      return null;
+   }
+
+   @Override
+   public String getAddress() {
+      if (address == null) {
+         Properties properties = getProtonMessage().getProperties();
+         if (properties != null) {
+            return  properties.getTo();
+         } else {
+            return null;
+         }
+      } else {
+         return address;
+      }
+   }
+
+   @Override
+   public AMQPMessage setAddress(SimpleString address) {
+      return setAddress(address.toString());
+   }
+
+   @Override
+   public AMQPMessage setAddress(String address) {
+      this.address = address;
+      return this;
+   }
+
+   @Override
+   public SimpleString getAddressSimpleString() {
+      return SimpleString.toSimpleString(getAddress());
+   }
+
+   @Override
+   public long getTimestamp() {
+      return 0;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
+      return null;
+   }
+
+   @Override
+   public byte getPriority() {
+      return 0;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) {
+      return null;
+   }
+
+   @Override
+   public void receiveBuffer(ByteBuf buffer) {
+
+   }
+
+   private synchronized void checkBuffer() {
+      if (!bufferValid) {
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
+         try {
+            getProtonMessage().encode(new NettyWritable(buffer));
+            byte[] bytes = new byte[buffer.writerIndex()];
+            buffer.readBytes(bytes);
+            this.data = Unpooled.wrappedBuffer(bytes);
+         } finally {
+            buffer.release();
+         }
+      }
+   }
+
+   @Override
+   public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+      // TODO: do I need to change the Header with deliveryCount?
+      //       I would send a new instance of Header with a new delivery count, and only send partial of the buffer
+      //       previously received
+      checkBuffer();
+      buffer.writeBytes(data);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key,
+                                                                         Object value) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key,
+                                                                         Object value) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Object removeProperty(String key) {
+      return null;
+   }
+
+   @Override
+   public boolean containsProperty(String key) {
+      return false;
+   }
+
+   @Override
+   public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Object getObjectProperty(String key) {
+      return null;
+   }
+
+   @Override
+   public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+      return new byte[0];
+   }
+
+   @Override
+   public Object removeProperty(SimpleString key) {
+      return null;
+   }
+
+   @Override
+   public boolean containsProperty(SimpleString key) {
+      return false;
+   }
+
+   @Override
+   public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Object getObjectProperty(SimpleString key) {
+      return null;
+   }
+
+   @Override
+   public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return null;
+   }
+
+   @Override
+   public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return new byte[0];
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) {
+      return null;
+   }
+
+   @Override
+   public int getEncodeSize() {
+      return 0;
+   }
+
+   @Override
+   public Set<SimpleString> getPropertyNames() {
+      return Collections.emptySet();
+   }
+
+   @Override
+   public int getMemoryEstimate() {
+      return 0;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message toCore() {
+      MessageImpl protonMessage = getProtonMessage();
+      return null;
+   }
+
+   @Override
+   public int getPersistSize() {
+      checkBuffer();
+      return data.array().length + DataConstants.SIZE_INT;
+   }
+
+   @Override
+   public void persist(ActiveMQBuffer targetRecord) {
+      checkBuffer();
+      targetRecord.writeInt(data.array().length);
+      targetRecord.writeBytes(data.array());
+   }
+
+   @Override
+   public void reloadPersistence(ActiveMQBuffer record) {
+      int size = record.readInt();
+      byte[] recordArray = new byte[size];
+      record.readBytes(recordArray);
+      this.data = Unpooled.wrappedBuffer(recordArray);
+      this.bufferValid = true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
new file mode 100644
index 0000000..3b5bdda
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class AMQPMessagePersister extends MessagePersister {
+
+   public static AMQPMessagePersister theInstance = new AMQPMessagePersister();
+
+   public static AMQPMessagePersister getInstance() {
+      return theInstance;
+   }
+
+   private AMQPMessagePersister() {
+   }
+
+   @Override
+   protected byte getID() {
+      return ProtonProtocolManagerFactory.ID;
+   }
+
+   @Override
+   public int getEncodeSize(Message record) {
+      return DataConstants.SIZE_BYTE + record.getPersistSize() +
+         SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
+   }
+
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      super.encode(buffer, record);
+      AMQPMessage msgEncode = (AMQPMessage)record;
+      buffer.writeLong(record.getMessageID());
+      buffer.writeLong(msgEncode.getMessageFormat());
+      buffer.writeNullableSimpleString(record.getAddressSimpleString());
+      record.persist(buffer);
+   }
+
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message record) {
+      long id = buffer.readLong();
+      long format = buffer.readLong();
+      SimpleString address = buffer.readNullableSimpleString();
+      record = new AMQPMessage(format);
+      record.reloadPersistence(buffer);
+      record.setMessageID(id);
+      if (address != null) {
+         record.setAddress(address);
+      }
+      return record;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 18c6b05..0b02838 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -34,14 +35,12 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -69,7 +68,6 @@ import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Receiver;
-import io.netty.buffer.ByteBuf;
 import org.jboss.logging.Logger;
 
 public class AMQPSessionCallback implements SessionCallback {
@@ -298,11 +296,11 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public long encodeMessage(Object message, int deliveryCount, WritableBuffer buffer) throws Exception {
+   public long encodeMessage(Message message, int deliveryCount, WritableBuffer buffer) throws Exception {
       ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter();
 
       // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode.
-      return (long) converter.outbound((ServerMessage) message, deliveryCount, buffer);
+      return (long) converter.outbound(message, deliveryCount, buffer);
    }
 
    public String tempQueueName() {
@@ -321,22 +319,22 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
+   public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
       if (transaction == null) {
          transaction = serverSession.getCurrentTransaction();
       }
       recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
+         ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
       } finally {
          resetContext();
       }
    }
 
-   public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
+   public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
       recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
+         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
       } finally {
          resetContext();
       }
@@ -351,11 +349,8 @@ public class AMQPSessionCallback implements SessionCallback {
                           final Delivery delivery,
                           String address,
                           int messageFormat,
-                          ByteBuf messageEncoded) throws Exception {
-      EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
-
-      ServerMessage message = manager.getConverter().inbound(encodedMessage);
-      //use the address on the receiver if not null, if null let's hope it was set correctly on the message
+                          byte[] data) throws Exception {
+      AMQPMessage message = new AMQPMessage(messageFormat, data, manager);
       if (address != null) {
          message.setAddress(new SimpleString(address));
       } else {
@@ -372,7 +367,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
       recoverContext();
 
-      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
+      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
       if (store.isRejectingMessages()) {
          // We drop pre-settled messages (and abort any associated Tx)
          if (delivery.remotelySettled()) {
@@ -401,7 +396,7 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    private void serverSend(final Transaction transaction,
-                           final ServerMessage message,
+                           final Message message,
                            final Delivery delivery,
                            final Receiver receiver) throws Exception {
       try {
@@ -416,8 +411,8 @@ public class AMQPSessionCallback implements SessionCallback {
                synchronized (connection.getLock()) {
                   delivery.disposition(Accepted.getInstance());
                   delivery.settle();
-                  connection.flush();
                }
+               connection.flush(true);
             }
 
             @Override
@@ -492,7 +487,7 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+   public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
 
       message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
 
@@ -512,7 +507,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference ref,
-                               ServerMessage message,
+                               Message message,
                                ServerConsumer consumer,
                                long bodySize,
                                int deliveryCount) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index bef8ef0..98ec228 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -22,6 +22,8 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -32,6 +34,8 @@ import org.osgi.service.component.annotations.Component;
 @Component(service = ProtocolManagerFactory.class)
 public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
 
+   public static final byte ID = 2;
+
    private static final String AMQP_PROTOCOL_NAME = "AMQP";
 
    private static final String MODULE_NAME = "artemis-amqp-protocol";
@@ -39,6 +43,16 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
    private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
 
    @Override
+   public byte getStoreID() {
+      return ID;
+   }
+
+   @Override
+   public Persister<Message> getPersister() {
+      return AMQPMessagePersister.getInstance();
+   }
+
+   @Override
    public ProtocolManager createProtocolManager(ActiveMQServer server,
                                                 final Map<String, Object> parameters,
                                                 List<BaseInterceptor> incomingInterceptors,


Mime
View raw message