activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [13/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Mon, 06 Mar 2017 11:54:00 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index 2ece01d..d06464f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -16,33 +16,20 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+import javax.jms.JMSException;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import javax.jms.JMSException;
-
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -50,9 +37,6 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -64,16 +48,18 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class JMSMappingOutboundTransformerTest {
 
    private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844");
    private final String TEST_ADDRESS = "queue://testAddress";
 
-   private IDGenerator idGenerator;
-   private JMSMappingOutboundTransformer transformer;
 
    public static final byte QUEUE_TYPE = 0x00;
    public static final byte TOPIC_TYPE = 0x01;
@@ -82,80 +68,10 @@ public class JMSMappingOutboundTransformerTest {
 
    @Before
    public void setUp() {
-      idGenerator = new SimpleIDGenerator(0);
-      transformer = new JMSMappingOutboundTransformer(idGenerator);
    }
 
    // ----- no-body Message type tests ---------------------------------------//
 
-   @Test
-   public void testConvertMessageToAmqpMessageWithNoBody() throws Exception {
-      ServerJMSMessage outbound = createMessage();
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNull(amqp.getBody());
-   }
-
-   @Test
-   public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception {
-      ServerJMSTextMessage outbound = createTextMessage();
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNull(amqp.getBody());
-   }
-
-   // ----- BytesMessage type tests ---------------------------------------//
-
-   @Test
-   public void testConvertEmptyBytesMessageToAmqpMessageWithDataBody() throws Exception {
-      ServerJMSBytesMessage outbound = createBytesMessage();
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof Data);
-      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
-      assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
-   }
-
-   @Test
-   public void testConvertUncompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
-      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
-      ServerJMSBytesMessage outbound = createBytesMessage();
-      outbound.writeBytes(expectedPayload);
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof Data);
-      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
-      assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
-
-      Binary amqpData = ((Data) amqp.getBody()).getValue();
-      Binary inputData = new Binary(expectedPayload);
-
-      assertTrue(inputData.equals(amqpData));
-   }
-
    @Ignore("Compressed message body support not yet implemented.")
    @Test
    public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
@@ -164,10 +80,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.writeBytes(expectedPayload);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -183,13 +96,9 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
       ServerJMSBytesMessage outbound = createBytesMessage();
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -201,14 +110,10 @@ public class JMSMappingOutboundTransformerTest {
    public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
       byte[] expectedPayload = new byte[] {8, 16, 24, 32};
       ServerJMSBytesMessage outbound = createBytesMessage();
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.writeBytes(expectedPayload);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -226,14 +131,10 @@ public class JMSMappingOutboundTransformerTest {
    public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
       byte[] expectedPayload = new byte[] {8, 16, 24, 32};
       ServerJMSBytesMessage outbound = createBytesMessage(true);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.writeBytes(expectedPayload);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -253,10 +154,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSMapMessage outbound = createMapMessage();
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -271,10 +169,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setBytes("bytes", byteArray);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -296,10 +191,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setBoolean("property-3", true);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -320,10 +212,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setBoolean("property-3", true);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -336,33 +225,12 @@ public class JMSMappingOutboundTransformerTest {
       assertTrue("string".equals(amqpMap.get("property-1")));
    }
 
-   // ----- StreamMessage type tests -----------------------------------------//
-
-   @Test
-   public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-      ServerJMSStreamMessage outbound = createStreamMessage();
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpValue);
-      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
-   }
-
    @Test
    public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
       ServerJMSStreamMessage outbound = createStreamMessage();
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -376,17 +244,15 @@ public class JMSMappingOutboundTransformerTest {
       outbound.writeString("test");
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpValue);
-      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
+      assertTrue(amqp.getBody() instanceof AmqpSequence);
+
+      AmqpSequence list = (AmqpSequence)amqp.getBody();
 
       @SuppressWarnings("unchecked")
-      List<Object> amqpList = (List<Object>) ((AmqpValue) amqp.getBody()).getValue();
+      List<Object> amqpList = list.getValue();
 
       assertEquals(2, amqpList.size());
    }
@@ -394,15 +260,11 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
       ServerJMSStreamMessage outbound = createStreamMessage(true);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
       outbound.writeBoolean(false);
       outbound.writeString("test");
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -421,10 +283,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSObjectMessage outbound = createObjectMessage();
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -434,45 +293,20 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage();
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
       assertEquals(5, ((Data) amqp.getBody()).getValue().getLength());
    }
-
-   @Test
-   public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-      ServerJMSObjectMessage outbound = createObjectMessage();
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpValue);
-      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
-      assertEquals(5, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
-   }
-
    @Test
    public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -486,13 +320,9 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -504,35 +334,11 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    @Test
-   public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
-      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpValue);
-      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
-      assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
-
-      Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
-      assertNotNull(value);
-      assertTrue(value instanceof UUID);
-   }
-
-   @Test
    public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -546,13 +352,9 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -566,20 +368,16 @@ public class JMSMappingOutboundTransformerTest {
    @Test
    public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof AmqpValue);
-      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
-      assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+      assertFalse(0 == ((Binary) ((Data) amqp.getBody()).getValue()).getLength());
 
-      Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
+      Object value = deserialize((((Data) amqp.getBody()).getValue()).getArray());
       assertNotNull(value);
       assertTrue(value instanceof UUID);
    }
@@ -591,10 +389,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSTextMessage outbound = createTextMessage();
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -602,57 +397,12 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    @Test
-   public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
-      String contentString = "myTextMessageContent";
-      ServerJMSTextMessage outbound = createTextMessage(contentString);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof Data);
-      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
-
-      Binary data = ((Data) amqp.getBody()).getValue();
-      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
-      assertEquals(contentString, contents);
-   }
-
-   @Test
-   public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
-      String contentString = "myTextMessageContent";
-      ServerJMSTextMessage outbound = createTextMessage(contentString);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
-      outbound.encode();
-
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
-
-      assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof Data);
-      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
-
-      Binary data = ((Data) amqp.getBody()).getValue();
-      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
-      assertEquals(contentString, contents);
-   }
-
-   @Test
    public void testConvertTextMessageCreatesAmqpValueStringBody() throws Exception {
       String contentString = "myTextMessageContent";
       ServerJMSTextMessage outbound = createTextMessage(contentString);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -665,10 +415,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSTextMessage outbound = createTextMessage(contentString);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -679,21 +426,16 @@ public class JMSMappingOutboundTransformerTest {
    public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception {
       String contentString = "myTextMessageContent";
       ServerJMSTextMessage outbound = createTextMessage(contentString, true);
-      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
       outbound.encode();
 
-      EncodedMessage encoded = transform(outbound);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
 
       assertNotNull(amqp.getBody());
-      assertTrue(amqp.getBody() instanceof Data);
-      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+      assertTrue(amqp.getBody() instanceof AmqpValue);
 
-      Binary data = ((Data) amqp.getBody()).getValue();
-      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
-      assertEquals(contentString, contents);
+      AmqpValue value = (AmqpValue)amqp.getBody();
+
+      assertEquals(contentString, value.getValue());
    }
 
    // ----- Test JMSDestination Handling -------------------------------------//
@@ -731,15 +473,12 @@ public class JMSMappingOutboundTransformerTest {
       textMessage.setText("myTextMessageContent");
       textMessage.setJMSDestination(jmsDestination);
 
-      EncodedMessage encoded = transform(textMessage);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
 
       MessageAnnotations ma = amqp.getMessageAnnotations();
       Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
       if (maMap != null) {
-         Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION);
+         Object actualValue = maMap.get(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
          assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
       } else if (expectedAnnotationValue != null) {
          fail("Expected annotation value, but there were no annotations");
@@ -785,15 +524,12 @@ public class JMSMappingOutboundTransformerTest {
       textMessage.setText("myTextMessageContent");
       textMessage.setJMSReplyTo(jmsReplyTo);
 
-      EncodedMessage encoded = transform(textMessage);
-      assertNotNull(encoded);
-
-      Message amqp = encoded.decode();
+      Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
 
       MessageAnnotations ma = amqp.getMessageAnnotations();
       Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
       if (maMap != null) {
-         Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
+         Object actualValue = maMap.get(AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
          assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
       } else if (expectedAnnotationValue != null) {
          fail("Expected annotation value, but there were no annotations");
@@ -806,17 +542,6 @@ public class JMSMappingOutboundTransformerTest {
 
    // ----- Utility Methods used for this Test -------------------------------//
 
-   public EncodedMessage transform(ServerJMSMessage message) throws Exception {
-      // Useful for testing but not recommended for real life use.
-      ByteBuf nettyBuffer = Unpooled.buffer(1024);
-      NettyWritable buffer = new NettyWritable(nettyBuffer);
-
-      long messageFormat = transformer.transform(message, buffer);
-
-      EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
-
-      return encoded;
-   }
 
    private ServerDestination createDestination(byte destType) {
       ServerDestination destination = null;
@@ -841,7 +566,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSMessage createMessage() {
-      return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0);
+      return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE));
    }
 
    private ServerJMSBytesMessage createBytesMessage() {
@@ -849,7 +574,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSBytesMessage createBytesMessage(boolean compression) {
-      ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0);
+      ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE));
 
       if (compression) {
          // TODO
@@ -863,7 +588,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSMapMessage createMapMessage(boolean compression) {
-      ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0);
+      ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE));
 
       if (compression) {
          // TODO
@@ -877,7 +602,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSStreamMessage createStreamMessage(boolean compression) {
-      ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0);
+      ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE));
 
       if (compression) {
          // TODO
@@ -895,7 +620,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
-      ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(idGenerator);
+      ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0);
 
       if (compression) {
          // TODO
@@ -922,7 +647,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
-      ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(idGenerator);
+      ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0);
 
       if (compression) {
          // TODO
@@ -943,8 +668,8 @@ public class JMSMappingOutboundTransformerTest {
       }
    }
 
-   private ServerMessageImpl newMessage(byte messageType) {
-      ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512);
+   private CoreMessage newMessage(byte messageType) {
+      CoreMessage message = new CoreMessage(0, 512);
       message.setType(messageType);
       ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
index 99aab33..483f245 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
@@ -21,27 +21,23 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 /**
  * Some simple performance tests for the Message Transformers.
  */
@@ -51,16 +47,11 @@ public class JMSTransformationSpeedComparisonTest {
    @Rule
    public TestName test = new TestName();
 
-   private IDGenerator idGenerator;
-   private ProtonMessageConverter converter;
-
    private final int WARM_CYCLES = 1000;
    private final int PROFILE_CYCLES = 1000000;
 
    @Before
    public void setUp() {
-      idGenerator = new SimpleIDGenerator(0);
-      converter = new ProtonMessageConverter(idGenerator);
    }
 
    @Test
@@ -68,20 +59,20 @@ public class JMSTransformationSpeedComparisonTest {
 
       Message message = Proton.message();
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-      EncodedMessage encoded = encode(message);
+      AMQPMessage encoded = new AMQPMessage(message);
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       long totalDuration = 0;
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
       totalDuration += System.nanoTime() - startTime;
 
@@ -99,20 +90,20 @@ public class JMSTransformationSpeedComparisonTest {
       message.setContentType("text/plain");
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      EncodedMessage encoded = encode(message);
+      AMQPMessage encoded = new AMQPMessage(message);
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       long totalDuration = 0;
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
       totalDuration += System.nanoTime() - startTime;
 
@@ -122,20 +113,20 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testTypicalQpidJMSMessage() throws Exception {
 
-      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+      AMQPMessage encoded = new AMQPMessage(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       long totalDuration = 0;
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
       totalDuration += System.nanoTime() - startTime;
 
@@ -145,20 +136,20 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testComplexQpidJMSMessage() throws Exception {
 
-      EncodedMessage encoded = encode(createComplexQpidJMSMessage());
+      AMQPMessage encoded = encode(createComplexQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       long totalDuration = 0;
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         ServerMessage intermediate = converter.inbound(encoded);
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
       totalDuration += System.nanoTime() - startTime;
 
@@ -168,18 +159,20 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
 
-      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+      AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         converter.inbound(encoded);
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       long totalDuration = 0;
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         converter.inbound(encoded);
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       totalDuration += System.nanoTime() - startTime;
@@ -190,19 +183,20 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
 
-      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
-      ServerMessage intermediate = converter.inbound(encoded);
+      AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       long totalDuration = 0;
 
       long startTime = System.nanoTime();
       for (int i = 0; i < PROFILE_CYCLES; ++i) {
-         encode(converter.outbound(intermediate, 1));
+         ICoreMessage intermediate = encoded.toCore();
+         encode(AMQPConverter.getInstance().fromCore(intermediate));
       }
 
       totalDuration += System.nanoTime() - startTime;
@@ -278,16 +272,16 @@ public class JMSTransformationSpeedComparisonTest {
       return message;
    }
 
-   private EncodedMessage encode(Object target) {
-      if (target instanceof ProtonJMessage) {
-         ProtonJMessage amqp = (ProtonJMessage) target;
-
-         ByteBuf nettyBuffer = Unpooled.buffer(1024);
-         amqp.encode(new NettyWritable(nettyBuffer));
+   private AMQPMessage encode(Message message) {
+      return new AMQPMessage(message);
+   }
 
-         return new EncodedMessage(0, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
-      } else {
-         return null;
+   private void encode(AMQPMessage target) {
+      ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+      try {
+         target.sendBuffer(buf, 1);
+      } finally {
+         buf.release();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
index a5a2168..a73d29f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -16,36 +16,28 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 /**
  * Tests some basic encode / decode functionality on the transformers.
  */
@@ -54,72 +46,10 @@ public class MessageTransformationTest {
    @Rule
    public TestName test = new TestName();
 
-   private IDGenerator idGenerator;
-   private ProtonMessageConverter converter;
-
    @Before
    public void setUp() {
-      idGenerator = new SimpleIDGenerator(0);
-      converter = new ProtonMessageConverter(idGenerator);
    }
 
-   @Test
-   public void testEncodeDecodeFidelity() throws Exception {
-      Map<String, Object> applicationProperties = new HashMap<>();
-      Map<Symbol, Object> messageAnnotations = new HashMap<>();
-
-      applicationProperties.put("property-1", "string");
-      applicationProperties.put("property-2", 512);
-      applicationProperties.put("property-3", true);
-
-      messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
-      messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
-
-      Message incomingMessage = Proton.message();
-
-      incomingMessage.setAddress("queue://test-queue");
-      incomingMessage.setDeliveryCount(1);
-      incomingMessage.setApplicationProperties(new ApplicationProperties(applicationProperties));
-      incomingMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
-      incomingMessage.setCreationTime(System.currentTimeMillis());
-      incomingMessage.setContentType("text/plain");
-      incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-
-      EncodedMessage encoded = encode(incomingMessage);
-
-      ServerMessage outbound = converter.inbound(encoded);
-      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode();
-
-      // Test that message details are equal
-      assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress());
-      assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount());
-      assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime());
-      assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType());
-
-      // Test Message annotations
-      ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties();
-      ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties();
-
-      assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue());
-
-      // Test Message properties
-      MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations();
-      MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations();
-
-      assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue());
-
-      // Test that bodies are equal
-      assertTrue(incomingMessage.getBody() instanceof AmqpValue);
-      assertTrue(outboudMessage.getBody() instanceof AmqpValue);
-
-      AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody();
-      AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody();
-
-      assertTrue(incomingBody.getValue() instanceof String);
-      assertTrue(outgoingBody.getValue() instanceof String);
-
-      assertEquals(incomingBody.getValue(), outgoingBody.getValue());
-   }
 
    @Test
    public void testBodyOnlyEncodeDecode() throws Exception {
@@ -128,12 +58,10 @@ public class MessageTransformationTest {
 
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
-      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+      ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
 
       assertNull(outboudMessage.getHeader());
-      assertNull(outboudMessage.getProperties());
    }
 
    @Test
@@ -144,9 +72,8 @@ public class MessageTransformationTest {
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
       incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
 
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
-      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+      ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
 
       assertNull(outboudMessage.getHeader());
       assertNotNull(outboudMessage.getProperties());
@@ -160,32 +87,9 @@ public class MessageTransformationTest {
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
       incomingMessage.setDurable(true);
 
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
-      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
-
-      assertNotNull(outboudMessage.getHeader());
-      assertNull(outboudMessage.getProperties());
-   }
-
-   @Test
-   public void testMessageWithAmqpValueThatFailsJMSConversion() throws Exception {
-
-      Message incomingMessage = Proton.message();
-
-      incomingMessage.setBody(new AmqpValue(new Boolean(true)));
-
-      EncodedMessage encoded = encode(incomingMessage);
-      ServerMessage outbound = converter.inbound(encoded);
-      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+      ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
 
-      Section section = outboudMessage.getBody();
-      assertNotNull(section);
-      assertTrue(section instanceof AmqpValue);
-      AmqpValue amqpValue = (AmqpValue) section;
-      assertNotNull(amqpValue.getValue());
-      assertTrue(amqpValue.getValue() instanceof Boolean);
-      assertEquals(true, amqpValue.getValue());
    }
 
    @Test
@@ -233,32 +137,10 @@ public class MessageTransformationTest {
       message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      EncodedMessage encoded = encode(message);
-      ServerMessage outbound = converter.inbound(encoded);
-      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+      ICoreMessage core = new AMQPMessage(message).toCore();
+      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
 
-      assertNotNull(outboudMessage.getHeader());
-      assertNotNull(outboudMessage.getProperties());
-      assertNotNull(outboudMessage.getMessageAnnotations());
-      assertNotNull(outboudMessage.getApplicationProperties());
-      assertNull(outboudMessage.getDeliveryAnnotations());
-      assertNull(outboudMessage.getFooter());
-
-      assertEquals(9, outboudMessage.getApplicationProperties().getValue().size());
+      assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
       assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
    }
-
-   private EncodedMessage encode(Message message) {
-      ProtonJMessage amqp = (ProtonJMessage) message;
-
-      ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
-      final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-      int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-      if (overflow.position() > 0) {
-         buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
-         c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-      }
-
-      return new EncodedMessage(1, buffer.array(), 0, c);
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
new file mode 100644
index 0000000..db40a8e
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.message;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AMQPMessageTest {
+
+   @Test
+   public void testVerySimple() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader( new Header());
+      Properties properties = new Properties();
+      properties.setTo("someNiceLocal");
+      protonMessage.setProperties(properties);
+      protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
+      protonMessage.getHeader().setDurable(Boolean.TRUE);
+      protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap()));
+
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      protonMessage.encode(new NettyWritable(nettyBuffer));
+
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+
+      nettyBuffer.readBytes(bytes);
+
+      AMQPMessage encode = new AMQPMessage(0, bytes);
+
+      Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue());
+      Assert.assertEquals(true, encode.getHeader().getDurable());
+      Assert.assertEquals("someNiceLocal", encode.getAddress());
+
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 1f435ff..f4cba64 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -132,11 +131,6 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
    }
 
    @Override
-   public MessageConverter getConverter() {
-      return null;
-   }
-
-   @Override
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index f0385dc..553521b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -28,17 +28,19 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
 
 /**
  * Handles MQTT Exactly Once (QoS level 2) Protocol.
  */
 public class MQTTPublishManager {
 
+   private static final Logger logger = Logger.getLogger(MQTTPublishManager.class);
+
    private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
 
    private SimpleString managementAddress;
@@ -112,19 +114,19 @@ public class MQTTPublishManager {
     * to original ID and consumer in the Session state.  This way we can look up the consumer Id and the message Id from
     * the PubAck or PubRec message id. *
     */
-   protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
+   protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
       // This is to allow retries of PubRel.
       if (isManagementConsumer(consumer)) {
          sendPubRelMessage(message);
       } else {
          int qos = decideQoS(message, consumer);
          if (qos == 0) {
-            sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
+            sendServerMessage((int) message.getMessageID(),  message, deliveryCount, qos);
             session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
          } else if (qos == 1 || qos == 2) {
             int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
             outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
-            sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+            sendServerMessage(mqttid, message, deliveryCount, qos);
          } else {
             // Client must have disconnected and it's Subscription QoS cleared
             consumer.individualCancel(message.getMessageID(), false);
@@ -149,7 +151,7 @@ public class MQTTPublishManager {
     */
    void sendInternal(int messageId, String topic, int qos, ByteBuf payload, boolean retain, boolean internal) throws Exception {
       synchronized (lock) {
-         ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
+         Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
 
          if (qos > 0) {
             serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
@@ -173,6 +175,7 @@ public class MQTTPublishManager {
                }
                tx.commit();
             } catch (Throwable t) {
+               logger.warn(t.getMessage(), t);
                tx.rollback();
                throw t;
             }
@@ -181,7 +184,7 @@ public class MQTTPublishManager {
       }
    }
 
-   void sendPubRelMessage(ServerMessage message) {
+   void sendPubRelMessage(Message message) {
       int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
       session.getProtocolHandler().sendPubRel(messageId);
    }
@@ -190,7 +193,7 @@ public class MQTTPublishManager {
       try {
          Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
          if (ref != null) {
-            ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+            Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
             session.getServerSession().send(m, true);
             session.getServerSession().acknowledge(ref.getB(), ref.getA());
          } else {
@@ -246,30 +249,30 @@ public class MQTTPublishManager {
       }
    }
 
-   private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
+   private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
       String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
 
       ByteBuf payload;
       switch (message.getType()) {
          case Message.TEXT_TYPE:
             try {
-               SimpleString text = message.getBodyBuffer().readNullableSimpleString();
+               SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString();
                byte[] stringPayload = text.toString().getBytes("UTF-8");
                payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
                payload.writeBytes(stringPayload);
                break;
             } catch (UnsupportedEncodingException e) {
-               log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+               log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
             }
          default:
-            ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate();
-            payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
+            ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer();
+            payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
             break;
       }
       session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
    }
 
-   private int decideQoS(ServerMessage message, ServerConsumer consumer) {
+   private int decideQoS(Message message, ServerConsumer consumer) {
 
       int subscriptionQoS = -1;
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 596670b..0b52a0b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -17,12 +17,12 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
@@ -44,7 +44,7 @@ public class MQTTRetainMessageManager {
     * the subscription queue for the consumer.  When a new retained message is received the message will be sent to
     * the retained queue and the previous retain message consumed to remove it from the queue.
     */
-   void handleRetainedMessage(ServerMessage message, String address, boolean reset, Transaction tx) throws Exception {
+   void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
       SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
 
       Queue queue = session.getServer().locateQueue(retainAddress);
@@ -82,7 +82,7 @@ public class MQTTRetainMessageManager {
                Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
                try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
                   if (i.hasNext()) {
-                     ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
+                     Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
                      sendToQueue(message, queue, tx);
                   }
                }
@@ -95,7 +95,7 @@ public class MQTTRetainMessageManager {
       tx.commit();
    }
 
-   private void sendToQueue(ServerMessage message, Queue queue, Transaction tx) throws Exception {
+   private void sendToQueue(Message message, Queue queue, Transaction tx) throws Exception {
       RoutingContext context = new RoutingContextImpl(tx);
       queue.route(message, context);
       session.getServer().getPostOffice().processRoute(message, context, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 548b62c..a5b908f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -17,10 +17,12 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
@@ -43,13 +45,13 @@ public class MQTTSessionCallback implements SessionCallback {
 
    @Override
    public int sendMessage(MessageReference reference,
-                          ServerMessage message,
+                          Message message,
                           ServerConsumer consumer,
                           int deliveryCount) {
       try {
-         session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
+         session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
       } catch (Exception e) {
-         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
       }
       return 1;
    }
@@ -70,7 +72,7 @@ public class MQTTSessionCallback implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference reference,
-                               ServerMessage message,
+                               Message message,
                                ServerConsumer consumer,
                                long bodySize,
                                int deliveryCount) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 7bc6b84..613fef3 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -24,12 +24,11 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 /**
  * A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis.
@@ -93,13 +92,13 @@ public class MQTTUtil {
       return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration);
    }
 
-   private static ServerMessage createServerMessage(MQTTSession session,
+   private static ICoreMessage createServerMessage(MQTTSession session,
                                                     SimpleString address,
                                                     boolean retain,
                                                     int qos) {
       long id = session.getServer().getStorageManager().generateID();
 
-      ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+      CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
       message.setAddress(address);
       message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
       message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
@@ -107,21 +106,20 @@ public class MQTTUtil {
       return message;
    }
 
-   public static ServerMessage createServerMessageFromByteBuf(MQTTSession session,
+   public static Message createServerMessageFromByteBuf(MQTTSession session,
                                                               String topic,
                                                               boolean retain,
                                                               int qos,
                                                               ByteBuf payload) {
       String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
-      ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+      ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
 
-      // FIXME does this involve a copy?
-      message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
+      message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
       return message;
    }
 
-   public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
-      ServerMessage message = createServerMessage(session, address, false, 1);
+   public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
+      Message message = createServerMessage(session, address, false, 1);
       message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
       message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 5f408a6..46fe372 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -187,7 +187,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private AtomicBoolean disableTtl = new AtomicBoolean(false);
 
-   // TODO-NOW: check on why there are two connections created for every createConnection on the client.
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
                              Executor executor,
@@ -1060,8 +1059,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processRemoveProducer(ProducerId id) throws Exception {
-
-         // TODO-now: proper implement this method
          return null;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 9b27b81..3808363 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -35,12 +35,12 @@ import java.util.zip.InflaterOutputStream;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -69,7 +69,7 @@ import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.UTF8Buffer;
 
-public class OpenWireMessageConverter implements MessageConverter {
+public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
 
    public static final String AMQ_PREFIX = "__HDR_";
    public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
@@ -102,16 +102,26 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    @Override
-   public Object outbound(ServerMessage message, int deliveryCount) {
-      // TODO: implement this
+   public OpenwireMessage fromCore(ICoreMessage coreMessage) throws Exception {
       return null;
    }
 
    @Override
-   public ServerMessage inbound(Object message) throws Exception {
+   public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception {
+      return null;
+   }
+
+   //   @Override
+   public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
+      // TODO: implement this
+      return null;
+   }
+
+//   @Override
+   public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
 
       Message messageSend = (Message) message;
-      ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize());
 
       String type = messageSend.getType();
       if (type != null) {
@@ -157,7 +167,7 @@ public class OpenWireMessageConverter implements MessageConverter {
                mdataIn.close();
                TypedProperties props = new TypedProperties();
                loadMapIntoProperties(props, map);
-               props.encode(body);
+               props.encode(body.byteBuf());
                break;
             case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
                if (messageCompressed) {
@@ -415,7 +425,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    public static MessageDispatch createMessageDispatch(MessageReference reference,
-                                                       ServerMessage message,
+                                                       ICoreMessage message,
                                                        AMQConsumer consumer) throws IOException, JMSException {
       ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
 
@@ -433,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    private static ActiveMQMessage toAMQMessage(MessageReference reference,
-                                               ServerMessage coreMessage,
+                                               ICoreMessage coreMessage,
                                                WireFormat marshaller,
                                                ActiveMQDestination actualDestination) throws IOException {
       ActiveMQMessage amqMsg = null;
@@ -476,7 +486,7 @@ public class OpenWireMessageConverter implements MessageConverter {
       }
       amqMsg.setBrokerInTime(brokerInTime);
 
-      ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate();
+      ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
       Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
       boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
       amqMsg.setCompressed(isCompressed);
@@ -503,7 +513,7 @@ public class OpenWireMessageConverter implements MessageConverter {
                TypedProperties mapData = new TypedProperties();
                //it could be a null map
                if (buffer.readableBytes() > 0) {
-                  mapData.decode(buffer);
+                  mapData.decode(buffer.byteBuf());
                   Map<String, Object> map = mapData.getMap();
                   ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
                   OutputStream os = out;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 5b62e3e..c0affb6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -35,6 +35,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
@@ -44,12 +45,10 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -137,7 +136,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
       final ClusterManager clusterManager = this.server.getClusterManager();
 
-      // TODO-NOW: use a property name for the cluster connection
       ClusterConnection cc = clusterManager.getDefaultConnection(null);
 
       if (cc != null) {
@@ -236,11 +234,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    }
 
    @Override
-   public MessageConverter getConverter() {
-      return messageConverter;
-   }
-
-   @Override
    public void removeHandler(String name) {
    }
 


Mime
View raw message