activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/5] activemq-artemis git commit: ARTEMIS-770 AMQP Message Transformer refactor
Date Fri, 07 Oct 2016 14:51:02 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 6f6d9845f -> 67f804054


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
new file mode 100644
index 0000000..2ece01d
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class JMSMappingOutboundTransformerTest {
+
+   private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844");
+   private final String TEST_ADDRESS = "queue://testAddress";
+
+   private IDGenerator idGenerator;
+   private JMSMappingOutboundTransformer transformer;
+
+   public static final byte QUEUE_TYPE = 0x00;
+   public static final byte TOPIC_TYPE = 0x01;
+   public static final byte TEMP_QUEUE_TYPE = 0x02;
+   public static final byte TEMP_TOPIC_TYPE = 0x03;
+
+   @Before
+   public void setUp() {
+      idGenerator = new SimpleIDGenerator(0);
+      transformer = new JMSMappingOutboundTransformer(idGenerator);
+   }
+
+   // ----- no-body Message type tests ---------------------------------------//
+
+   @Test
+   public void testConvertMessageToAmqpMessageWithNoBody() throws Exception {
+      ServerJMSMessage outbound = createMessage();
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNull(amqp.getBody());
+   }
+
+   @Test
+   public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception {
+      ServerJMSTextMessage outbound = createTextMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNull(amqp.getBody());
+   }
+
+   // ----- BytesMessage type tests ---------------------------------------//
+
+   @Test
+   public void testConvertEmptyBytesMessageToAmqpMessageWithDataBody() throws Exception {
+      ServerJMSBytesMessage outbound = createBytesMessage();
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
+   }
+
+   @Test
+   public void testConvertUncompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
+      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
+      ServerJMSBytesMessage outbound = createBytesMessage();
+      outbound.writeBytes(expectedPayload);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
+
+      Binary amqpData = ((Data) amqp.getBody()).getValue();
+      Binary inputData = new Binary(expectedPayload);
+
+      assertTrue(inputData.equals(amqpData));
+   }
+
+   @Ignore("Compressed message body support not yet implemented.")
+   @Test
+   public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
+      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
+      ServerJMSBytesMessage outbound = createBytesMessage(true);
+      outbound.writeBytes(expectedPayload);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
+
+      Binary amqpData = ((Data) amqp.getBody()).getValue();
+      Binary inputData = new Binary(expectedPayload);
+
+      assertTrue(inputData.equals(amqpData));
+   }
+
+   @Test
+   public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSBytesMessage outbound = createBytesMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(0, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+   }
+
+   @Test
+   public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
+      ServerJMSBytesMessage outbound = createBytesMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
+      outbound.writeBytes(expectedPayload);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+      Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue();
+      Binary inputData = new Binary(expectedPayload);
+
+      assertTrue(inputData.equals(amqpData));
+   }
+
+   @Ignore("Compressed message body support not yet implemented.")
+   @Test
+   public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      byte[] expectedPayload = new byte[] {8, 16, 24, 32};
+      ServerJMSBytesMessage outbound = createBytesMessage(true);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
+      outbound.writeBytes(expectedPayload);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+      Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue();
+      Binary inputData = new Binary(expectedPayload);
+
+      assertTrue(inputData.equals(amqpData));
+   }
+
+   // ----- MapMessage type tests --------------------------------------------//
+
+   @Test
+   public void testConvertMapMessageToAmqpMessageWithNoBody() throws Exception {
+      ServerJMSMapMessage outbound = createMapMessage();
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+   }
+
+   @Test
+   public void testConvertMapMessageToAmqpMessageWithByteArrayValueInBody() throws Exception {
+      final byte[] byteArray = new byte[] {1, 2, 3, 4, 5};
+
+      ServerJMSMapMessage outbound = createMapMessage();
+      outbound.setBytes("bytes", byteArray);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+      @SuppressWarnings("unchecked")
+      Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+      assertEquals(1, amqpMap.size());
+      Binary readByteArray = (Binary) amqpMap.get("bytes");
+      assertNotNull(readByteArray);
+   }
+
+   @Test
+   public void testConvertMapMessageToAmqpMessage() throws Exception {
+      ServerJMSMapMessage outbound = createMapMessage();
+      outbound.setString("property-1", "string");
+      outbound.setInt("property-2", 1);
+      outbound.setBoolean("property-3", true);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+      @SuppressWarnings("unchecked")
+      Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+      assertEquals(3, amqpMap.size());
+      assertTrue("string".equals(amqpMap.get("property-1")));
+   }
+
+   @Test
+   public void testConvertCompressedMapMessageToAmqpMessage() throws Exception {
+      ServerJMSMapMessage outbound = createMapMessage(true);
+      outbound.setString("property-1", "string");
+      outbound.setInt("property-2", 1);
+      outbound.setBoolean("property-3", true);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+      @SuppressWarnings("unchecked")
+      Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+      assertEquals(3, amqpMap.size());
+      assertTrue("string".equals(amqpMap.get("property-1")));
+   }
+
+   // ----- StreamMessage type tests -----------------------------------------//
+
+   @Test
+   public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSStreamMessage outbound = createStreamMessage();
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
+   }
+
+   @Test
+   public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
+      ServerJMSStreamMessage outbound = createStreamMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpSequence);
+      assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List);
+   }
+
+   @Test
+   public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSStreamMessage outbound = createStreamMessage(true);
+      outbound.writeBoolean(false);
+      outbound.writeString("test");
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
+
+      @SuppressWarnings("unchecked")
+      List<Object> amqpList = (List<Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+      assertEquals(2, amqpList.size());
+   }
+
+   @Test
+   public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
+      ServerJMSStreamMessage outbound = createStreamMessage(true);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
+      outbound.writeBoolean(false);
+      outbound.writeString("test");
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpSequence);
+      assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List);
+
+      @SuppressWarnings("unchecked")
+      List<Object> amqpList = ((AmqpSequence) amqp.getBody()).getValue();
+
+      assertEquals(2, amqpList.size());
+   }
+
+   // ----- ObjectMessage type tests -----------------------------------------//
+
+   @Test
+   public void testConvertEmptyObjectMessageToAmqpMessageWithDataBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage();
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertEquals(5, ((Data) amqp.getBody()).getValue().getLength());
+   }
+
+   @Test
+   public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertEquals(5, ((Data) amqp.getBody()).getValue().getLength());
+   }
+
+   @Test
+   public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage();
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+      assertEquals(5, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+   }
+
+   @Test
+   public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+      Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+      assertNotNull(value);
+      assertTrue(value instanceof UUID);
+   }
+
+   @Test
+   public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+      Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+      assertNotNull(value);
+      assertTrue(value instanceof UUID);
+   }
+
+   @Test
+   public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+      assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+      Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
+      assertNotNull(value);
+      assertTrue(value instanceof UUID);
+   }
+
+   @Test
+   public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+      Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+      assertNotNull(value);
+      assertTrue(value instanceof UUID);
+   }
+
+   @Test
+   public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
+
+      Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
+      assertNotNull(value);
+      assertTrue(value instanceof UUID);
+   }
+
+   @Test
+   public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
+      ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
+      assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+
+      Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
+      assertNotNull(value);
+      assertTrue(value instanceof UUID);
+   }
+
+   // ----- TextMessage type tests -------------------------------------------//
+
+   @Test
+   public void testConvertTextMessageToAmqpMessageWithNoBody() throws Exception {
+      ServerJMSTextMessage outbound = createTextMessage();
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertNull(((AmqpValue) amqp.getBody()).getValue());
+   }
+
+   @Test
+   public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
+      String contentString = "myTextMessageContent";
+      ServerJMSTextMessage outbound = createTextMessage(contentString);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+
+      Binary data = ((Data) amqp.getBody()).getValue();
+      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
+      assertEquals(contentString, contents);
+   }
+
+   @Test
+   public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
+      String contentString = "myTextMessageContent";
+      ServerJMSTextMessage outbound = createTextMessage(contentString);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+
+      Binary data = ((Data) amqp.getBody()).getValue();
+      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
+      assertEquals(contentString, contents);
+   }
+
+   @Test
+   public void testConvertTextMessageCreatesAmqpValueStringBody() throws Exception {
+      String contentString = "myTextMessageContent";
+      ServerJMSTextMessage outbound = createTextMessage(contentString);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue());
+   }
+
+   @Test
+   public void testConvertTextMessageContentNotStoredCreatesAmqpValueStringBody() throws Exception {
+      String contentString = "myTextMessageContent";
+      ServerJMSTextMessage outbound = createTextMessage(contentString);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof AmqpValue);
+      assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue());
+   }
+
+   @Test
+   public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception {
+      String contentString = "myTextMessageContent";
+      ServerJMSTextMessage outbound = createTextMessage(contentString, true);
+      outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
+      outbound.encode();
+
+      EncodedMessage encoded = transform(outbound);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      assertNotNull(amqp.getBody());
+      assertTrue(amqp.getBody() instanceof Data);
+      assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+
+      Binary data = ((Data) amqp.getBody()).getValue();
+      String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
+      assertEquals(contentString, contents);
+   }
+
+   // ----- Test JMSDestination Handling -------------------------------------//
+
+   @Test
+   public void testConvertMessageWithJMSDestinationNull() throws Exception {
+      doTestConvertMessageWithJMSDestination(null, null);
+   }
+
+   @Test
+   public void testConvertMessageWithJMSDestinationQueue() throws Exception {
+      doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), QUEUE_TYPE);
+   }
+
+   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
+   @Test
+   public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception {
+      doTestConvertMessageWithJMSDestination(createDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE);
+   }
+
+   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
+   @Test
+   public void testConvertMessageWithJMSDestinationTopic() throws Exception {
+      doTestConvertMessageWithJMSDestination(createDestination(TOPIC_TYPE), TOPIC_TYPE);
+   }
+
+   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
+   @Test
+   public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception {
+      doTestConvertMessageWithJMSDestination(createDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE);
+   }
+
+   private void doTestConvertMessageWithJMSDestination(ServerDestination jmsDestination, Object expectedAnnotationValue) throws Exception {
+      ServerJMSTextMessage textMessage = createTextMessage();
+      textMessage.setText("myTextMessageContent");
+      textMessage.setJMSDestination(jmsDestination);
+
+      EncodedMessage encoded = transform(textMessage);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      MessageAnnotations ma = amqp.getMessageAnnotations();
+      Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
+      if (maMap != null) {
+         Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION);
+         assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
+      } else if (expectedAnnotationValue != null) {
+         fail("Expected annotation value, but there were no annotations");
+      }
+
+      if (jmsDestination != null) {
+         assertEquals("Unexpected 'to' address", jmsDestination.getAddress(), amqp.getAddress());
+      }
+   }
+
+   // ----- Test JMSReplyTo Handling -----------------------------------------//
+
+   @Test
+   public void testConvertMessageWithJMSReplyToNull() throws Exception {
+      doTestConvertMessageWithJMSReplyTo(null, null);
+   }
+
+   @Test
+   public void testConvertMessageWithJMSReplyToQueue() throws Exception {
+      doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE), QUEUE_TYPE);
+   }
+
+   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
+   @Test
+   public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception {
+      doTestConvertMessageWithJMSReplyTo(createDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE);
+   }
+
+   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
+   @Test
+   public void testConvertMessageWithJMSReplyToTopic() throws Exception {
+      doTestConvertMessageWithJMSReplyTo(createDestination(TOPIC_TYPE), TOPIC_TYPE);
+   }
+
+   @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP")
+   @Test
+   public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception {
+      doTestConvertMessageWithJMSReplyTo(createDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE);
+   }
+
+   private void doTestConvertMessageWithJMSReplyTo(ServerDestination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
+      ServerJMSTextMessage textMessage = createTextMessage();
+      textMessage.setText("myTextMessageContent");
+      textMessage.setJMSReplyTo(jmsReplyTo);
+
+      EncodedMessage encoded = transform(textMessage);
+      assertNotNull(encoded);
+
+      Message amqp = encoded.decode();
+
+      MessageAnnotations ma = amqp.getMessageAnnotations();
+      Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
+      if (maMap != null) {
+         Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
+         assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
+      } else if (expectedAnnotationValue != null) {
+         fail("Expected annotation value, but there were no annotations");
+      }
+
+      if (jmsReplyTo != null) {
+         assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getAddress(), amqp.getReplyTo());
+      }
+   }
+
+   // ----- Utility Methods used for this Test -------------------------------//
+
+   public EncodedMessage transform(ServerJMSMessage message) throws Exception {
+      // Useful for testing but not recommended for real life use.
+      ByteBuf nettyBuffer = Unpooled.buffer(1024);
+      NettyWritable buffer = new NettyWritable(nettyBuffer);
+
+      long messageFormat = transformer.transform(message, buffer);
+
+      EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+
+      return encoded;
+   }
+
+   private ServerDestination createDestination(byte destType) {
+      ServerDestination destination = null;
+      switch (destType) {
+         case QUEUE_TYPE:
+            destination = new ServerDestination(TEST_ADDRESS);
+            break;
+         case TOPIC_TYPE:
+            destination = new ServerDestination(TEST_ADDRESS);
+            break;
+         case TEMP_QUEUE_TYPE:
+            destination = new ServerDestination(TEST_ADDRESS);
+            break;
+         case TEMP_TOPIC_TYPE:
+            destination = new ServerDestination(TEST_ADDRESS);
+            break;
+         default:
+            throw new IllegalArgumentException("Invliad Destination Type given/");
+      }
+
+      return destination;
+   }
+
+   private ServerJMSMessage createMessage() {
+      return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0);
+   }
+
+   private ServerJMSBytesMessage createBytesMessage() {
+      return createBytesMessage(false);
+   }
+
+   private ServerJMSBytesMessage createBytesMessage(boolean compression) {
+      ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0);
+
+      if (compression) {
+         // TODO
+      }
+
+      return message;
+   }
+
+   private ServerJMSMapMessage createMapMessage() {
+      return createMapMessage(false);
+   }
+
+   private ServerJMSMapMessage createMapMessage(boolean compression) {
+      ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0);
+
+      if (compression) {
+         // TODO
+      }
+
+      return message;
+   }
+
+   private ServerJMSStreamMessage createStreamMessage() {
+      return createStreamMessage(false);
+   }
+
+   private ServerJMSStreamMessage createStreamMessage(boolean compression) {
+      ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0);
+
+      if (compression) {
+         // TODO
+      }
+
+      return message;
+   }
+
+   private ServerJMSObjectMessage createObjectMessage() {
+      return createObjectMessage(null);
+   }
+
+   private ServerJMSObjectMessage createObjectMessage(Serializable payload) {
+      return createObjectMessage(payload, false);
+   }
+
+   private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
+      ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(idGenerator);
+
+      if (compression) {
+         // TODO
+      }
+
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos);) {
+
+         oos.writeObject(payload);
+         byte[] data = baos.toByteArray();
+         result.setSerializedForm(new Binary(data));
+      } catch (Exception ex) {
+         throw new AssertionError("Should not fail to setObject in this test");
+      }
+
+      return result;
+   }
+
+   private ServerJMSTextMessage createTextMessage() {
+      return createTextMessage(null);
+   }
+
+   private ServerJMSTextMessage createTextMessage(String text) {
+      return createTextMessage(text, false);
+   }
+
+   private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
+      ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(idGenerator);
+
+      if (compression) {
+         // TODO
+      }
+
+      try {
+         result.setText(text);
+      } catch (JMSException e) {
+      }
+
+      return result;
+   }
+
+   private Object deserialize(byte[] payload) throws Exception {
+      try (ByteArrayInputStream bis = new ByteArrayInputStream(payload); ObjectInputStream ois = new ObjectInputStream(bis);) {
+
+         return ois.readObject();
+      }
+   }
+
+   private ServerMessageImpl newMessage(byte messageType) {
+      ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512);
+      message.setType(messageType);
+      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
+      return message;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
new file mode 100644
index 0000000..99aab33
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Some simple performance tests for the Message Transformers.
+ */
+@Ignore("Useful for profiling but slow and not meant as a unit test")
+public class JMSTransformationSpeedComparisonTest {
+
+   @Rule
+   public TestName test = new TestName();
+
+   private IDGenerator idGenerator;
+   private ProtonMessageConverter converter;
+
+   private final int WARM_CYCLES = 1000;
+   private final int PROFILE_CYCLES = 1000000;
+
+   @Before
+   public void setUp() {
+      idGenerator = new SimpleIDGenerator(0);
+      converter = new ProtonMessageConverter(idGenerator);
+   }
+
+   @Test
+   public void testBodyOnlyMessage() throws Exception {
+
+      Message message = Proton.message();
+      message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+      EncodedMessage encoded = encode(message);
+
+      // Warm up
+      for (int i = 0; i < WARM_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+
+      long totalDuration = 0;
+
+      long startTime = System.nanoTime();
+      for (int i = 0; i < PROFILE_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+      totalDuration += System.nanoTime() - startTime;
+
+      LOG_RESULTS(totalDuration);
+   }
+
+   @Test
+   public void testMessageWithNoPropertiesOrAnnotations() throws Exception {
+
+      Message message = Proton.message();
+
+      message.setAddress("queue://test-queue");
+      message.setDeliveryCount(1);
+      message.setCreationTime(System.currentTimeMillis());
+      message.setContentType("text/plain");
+      message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+      EncodedMessage encoded = encode(message);
+
+      // Warm up
+      for (int i = 0; i < WARM_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+
+      long totalDuration = 0;
+
+      long startTime = System.nanoTime();
+      for (int i = 0; i < PROFILE_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+      totalDuration += System.nanoTime() - startTime;
+
+      LOG_RESULTS(totalDuration);
+   }
+
+   @Test
+   public void testTypicalQpidJMSMessage() throws Exception {
+
+      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+
+      // Warm up
+      for (int i = 0; i < WARM_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+
+      long totalDuration = 0;
+
+      long startTime = System.nanoTime();
+      for (int i = 0; i < PROFILE_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+      totalDuration += System.nanoTime() - startTime;
+
+      LOG_RESULTS(totalDuration);
+   }
+
+   @Test
+   public void testComplexQpidJMSMessage() throws Exception {
+
+      EncodedMessage encoded = encode(createComplexQpidJMSMessage());
+
+      // Warm up
+      for (int i = 0; i < WARM_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+
+      long totalDuration = 0;
+
+      long startTime = System.nanoTime();
+      for (int i = 0; i < PROFILE_CYCLES; ++i) {
+         ServerMessage intermediate = converter.inbound(encoded);
+         encode(converter.outbound(intermediate, 1));
+      }
+      totalDuration += System.nanoTime() - startTime;
+
+      LOG_RESULTS(totalDuration);
+   }
+
+   @Test
+   public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
+
+      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+
+      // Warm up
+      for (int i = 0; i < WARM_CYCLES; ++i) {
+         converter.inbound(encoded);
+      }
+
+      long totalDuration = 0;
+
+      long startTime = System.nanoTime();
+      for (int i = 0; i < PROFILE_CYCLES; ++i) {
+         converter.inbound(encoded);
+      }
+
+      totalDuration += System.nanoTime() - startTime;
+
+      LOG_RESULTS(totalDuration);
+   }
+
+   @Test
+   public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
+
+      EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+      ServerMessage intermediate = converter.inbound(encoded);
+
+      // Warm up
+      for (int i = 0; i < WARM_CYCLES; ++i) {
+         encode(converter.outbound(intermediate, 1));
+      }
+
+      long totalDuration = 0;
+
+      long startTime = System.nanoTime();
+      for (int i = 0; i < PROFILE_CYCLES; ++i) {
+         encode(converter.outbound(intermediate, 1));
+      }
+
+      totalDuration += System.nanoTime() - startTime;
+
+      LOG_RESULTS(totalDuration);
+   }
+
+   private Message createTypicalQpidJMSMessage() {
+      Map<String, Object> applicationProperties = new HashMap<>();
+      Map<Symbol, Object> messageAnnotations = new HashMap<>();
+
+      applicationProperties.put("property-1", "string");
+      applicationProperties.put("property-2", 512);
+      applicationProperties.put("property-3", true);
+
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
+
+      Message message = Proton.message();
+
+      message.setAddress("queue://test-queue");
+      message.setDeliveryCount(1);
+      message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+      message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+      message.setCreationTime(System.currentTimeMillis());
+      message.setContentType("text/plain");
+      message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+      return message;
+   }
+
+   private Message createComplexQpidJMSMessage() {
+      Map<String, Object> applicationProperties = new HashMap<>();
+      Map<Symbol, Object> messageAnnotations = new HashMap<>();
+
+      applicationProperties.put("property-1", "string-1");
+      applicationProperties.put("property-2", 512);
+      applicationProperties.put("property-3", true);
+      applicationProperties.put("property-4", "string-2");
+      applicationProperties.put("property-5", 512);
+      applicationProperties.put("property-6", true);
+      applicationProperties.put("property-7", "string-3");
+      applicationProperties.put("property-8", 512);
+      applicationProperties.put("property-9", true);
+
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
+
+      Message message = Proton.message();
+
+      // Header Values
+      message.setPriority((short) 9);
+      message.setDurable(true);
+      message.setDeliveryCount(2);
+      message.setTtl(5000);
+
+      // Properties
+      message.setMessageId("ID:SomeQualifier:0:0:1");
+      message.setGroupId("Group-ID-1");
+      message.setGroupSequence(15);
+      message.setAddress("queue://test-queue");
+      message.setReplyTo("queue://reply-queue");
+      message.setCreationTime(System.currentTimeMillis());
+      message.setContentType("text/plain");
+      message.setCorrelationId("ID:SomeQualifier:0:7:9");
+      message.setUserId("username".getBytes(StandardCharsets.UTF_8));
+
+      // Application Properties / Message Annotations / Body
+      message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+      message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+      message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+      return message;
+   }
+
+   private EncodedMessage encode(Object target) {
+      if (target instanceof ProtonJMessage) {
+         ProtonJMessage amqp = (ProtonJMessage) target;
+
+         ByteBuf nettyBuffer = Unpooled.buffer(1024);
+         amqp.encode(new NettyWritable(nettyBuffer));
+
+         return new EncodedMessage(0, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+      } else {
+         return null;
+      }
+   }
+
+   private void LOG_RESULTS(long duration) {
+      String result = "[JMS] Total time for " + PROFILE_CYCLES + " cycles of transforms = " + TimeUnit.NANOSECONDS.toMillis(duration) + " ms -> "
+         + test.getMethodName();
+
+      System.out.println(result);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
new file mode 100644
index 0000000..a5a2168
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Tests some basic encode / decode functionality on the transformers.
+ */
+public class MessageTransformationTest {
+
+   @Rule
+   public TestName test = new TestName();
+
+   private IDGenerator idGenerator;
+   private ProtonMessageConverter converter;
+
+   @Before
+   public void setUp() {
+      idGenerator = new SimpleIDGenerator(0);
+      converter = new ProtonMessageConverter(idGenerator);
+   }
+
+   @Test
+   public void testEncodeDecodeFidelity() throws Exception {
+      Map<String, Object> applicationProperties = new HashMap<>();
+      Map<Symbol, Object> messageAnnotations = new HashMap<>();
+
+      applicationProperties.put("property-1", "string");
+      applicationProperties.put("property-2", 512);
+      applicationProperties.put("property-3", true);
+
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
+
+      Message incomingMessage = Proton.message();
+
+      incomingMessage.setAddress("queue://test-queue");
+      incomingMessage.setDeliveryCount(1);
+      incomingMessage.setApplicationProperties(new ApplicationProperties(applicationProperties));
+      incomingMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+      incomingMessage.setCreationTime(System.currentTimeMillis());
+      incomingMessage.setContentType("text/plain");
+      incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+      EncodedMessage encoded = encode(incomingMessage);
+
+      ServerMessage outbound = converter.inbound(encoded);
+      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode();
+
+      // Test that message details are equal
+      assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress());
+      assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount());
+      assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime());
+      assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType());
+
+      // Test Message annotations
+      ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties();
+      ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties();
+
+      assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue());
+
+      // Test Message properties
+      MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations();
+      MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations();
+
+      assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue());
+
+      // Test that bodies are equal
+      assertTrue(incomingMessage.getBody() instanceof AmqpValue);
+      assertTrue(outboudMessage.getBody() instanceof AmqpValue);
+
+      AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody();
+      AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody();
+
+      assertTrue(incomingBody.getValue() instanceof String);
+      assertTrue(outgoingBody.getValue() instanceof String);
+
+      assertEquals(incomingBody.getValue(), outgoingBody.getValue());
+   }
+
+   @Test
+   public void testBodyOnlyEncodeDecode() throws Exception {
+
+      Message incomingMessage = Proton.message();
+
+      incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+      EncodedMessage encoded = encode(incomingMessage);
+      ServerMessage outbound = converter.inbound(encoded);
+      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+
+      assertNull(outboudMessage.getHeader());
+      assertNull(outboudMessage.getProperties());
+   }
+
+   @Test
+   public void testPropertiesButNoHeadersEncodeDecode() throws Exception {
+
+      Message incomingMessage = Proton.message();
+
+      incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+      incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
+
+      EncodedMessage encoded = encode(incomingMessage);
+      ServerMessage outbound = converter.inbound(encoded);
+      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+
+      assertNull(outboudMessage.getHeader());
+      assertNotNull(outboudMessage.getProperties());
+   }
+
+   @Test
+   public void testHeaderButNoPropertiesEncodeDecode() throws Exception {
+
+      Message incomingMessage = Proton.message();
+
+      incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+      incomingMessage.setDurable(true);
+
+      EncodedMessage encoded = encode(incomingMessage);
+      ServerMessage outbound = converter.inbound(encoded);
+      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+
+      assertNotNull(outboudMessage.getHeader());
+      assertNull(outboudMessage.getProperties());
+   }
+
+   @Test
+   public void testMessageWithAmqpValueThatFailsJMSConversion() throws Exception {
+
+      Message incomingMessage = Proton.message();
+
+      incomingMessage.setBody(new AmqpValue(new Boolean(true)));
+
+      EncodedMessage encoded = encode(incomingMessage);
+      ServerMessage outbound = converter.inbound(encoded);
+      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+
+      Section section = outboudMessage.getBody();
+      assertNotNull(section);
+      assertTrue(section instanceof AmqpValue);
+      AmqpValue amqpValue = (AmqpValue) section;
+      assertNotNull(amqpValue.getValue());
+      assertTrue(amqpValue.getValue() instanceof Boolean);
+      assertEquals(true, amqpValue.getValue());
+   }
+
+   @Test
+   public void testComplexQpidJMSMessageEncodeDecode() throws Exception {
+
+      Map<String, Object> applicationProperties = new HashMap<>();
+      Map<Symbol, Object> messageAnnotations = new HashMap<>();
+
+      applicationProperties.put("property-1", "string-1");
+      applicationProperties.put("property-2", 512);
+      applicationProperties.put("property-3", true);
+      applicationProperties.put("property-4", "string-2");
+      applicationProperties.put("property-5", 512);
+      applicationProperties.put("property-6", true);
+      applicationProperties.put("property-7", "string-3");
+      applicationProperties.put("property-8", 512);
+      applicationProperties.put("property-9", true);
+
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
+      messageAnnotations.put(Symbol.valueOf("x-opt-jms-reply-to"), 0);
+      messageAnnotations.put(Symbol.valueOf("x-opt-delivery-delay"), 2000);
+
+      Message message = Proton.message();
+
+      // Header Values
+      message.setPriority((short) 9);
+      message.setDurable(true);
+      message.setDeliveryCount(2);
+      message.setTtl(5000);
+
+      // Properties
+      message.setMessageId("ID:SomeQualifier:0:0:1");
+      message.setGroupId("Group-ID-1");
+      message.setGroupSequence(15);
+      message.setAddress("queue://test-queue");
+      message.setReplyTo("queue://reply-queue");
+      message.setCreationTime(System.currentTimeMillis());
+      message.setContentType("text/plain");
+      message.setCorrelationId("ID:SomeQualifier:0:7:9");
+      message.setUserId("username".getBytes(StandardCharsets.UTF_8));
+
+      // Application Properties / Message Annotations / Body
+      message.setApplicationProperties(new ApplicationProperties(applicationProperties));
+      message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
+      message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
+
+      EncodedMessage encoded = encode(message);
+      ServerMessage outbound = converter.inbound(encoded);
+      Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+
+      assertNotNull(outboudMessage.getHeader());
+      assertNotNull(outboudMessage.getProperties());
+      assertNotNull(outboudMessage.getMessageAnnotations());
+      assertNotNull(outboudMessage.getApplicationProperties());
+      assertNull(outboudMessage.getDeliveryAnnotations());
+      assertNull(outboudMessage.getFooter());
+
+      assertEquals(9, outboudMessage.getApplicationProperties().getValue().size());
+      assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
+   }
+
+   private EncodedMessage encode(Message message) {
+      ProtonJMessage amqp = (ProtonJMessage) message;
+
+      ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
+      final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+      int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+      if (overflow.position() > 0) {
+         buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
+         c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+      }
+
+      return new EncodedMessage(1, buffer.array(), 0, c);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index f39a9c5..e3e9681 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -204,6 +204,30 @@ public class AmqpMessage {
    }
 
    /**
+    * Sets the replyTo address which is applied to the AMQP message reply-to field in the message properties
+    *
+    * @param address The replyTo address that should be applied in the Message To field.
+    */
+   public void setReplyToAddress(String address) {
+      checkReadOnly();
+      lazyCreateProperties();
+      getWrappedMessage().setReplyTo(address);
+   }
+
+   /**
+    * Return the set replyTo address that was set in the Message To field.
+    *
+    * @return the set replyTo address String form or null if not set.
+    */
+   public String getReplyToAddress() {
+      if (message.getProperties() == null) {
+         return null;
+      }
+
+      return message.getProperties().getReplyTo();
+   }
+
+   /**
     * Sets the MessageId property on an outbound message using the provided String
     *
     * @param messageId the String message ID value to set.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 2a1e8c9..15271f6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -16,6 +16,25 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -36,21 +55,9 @@ import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -83,11 +90,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
-
 @RunWith(Parameterized.class)
 public class ProtonTest extends ProtonTestBase {
 
@@ -180,6 +182,31 @@ public class ProtonTest extends ProtonTestBase {
    }
 
    @Test
+   public void testSendAndReceiveOnTopic() throws Exception {
+      Connection connection = createConnection("myClientId");
+      try {
+         TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic("amqp_testtopic");
+         TopicSubscriber consumer = session.createSubscriber(topic);
+         TopicPublisher producer = session.createPublisher(topic);
+
+         TextMessage message = session.createTextMessage("test-message");
+         producer.send(message);
+         producer.close();
+
+         connection.start();
+
+         message = (TextMessage) consumer.receive(1000);
+         assertNotNull(message);
+         assertNotNull(message.getText());
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test
    public void testDurableSubscriptionUnsubscribe() throws Exception {
       Connection connection = createConnection("myClientId");
       try {
@@ -495,7 +522,7 @@ public class ProtonTest extends ProtonTestBase {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       javax.jms.Queue queue = createQueue(address);
       MessageProducer p = session.createProducer(queue);
-      ArrayList list = new ArrayList();
+      ArrayList<String> list = new ArrayList<>();
       list.add("aString");
       ObjectMessage objectMessage = session.createObjectMessage(list);
       p.send(objectMessage);
@@ -507,7 +534,7 @@ public class ProtonTest extends ProtonTestBase {
 
       objectMessage = (ObjectMessage) cons.receive(5000);
       assertNotNull(objectMessage);
-      list = (ArrayList) objectMessage.getObject();
+      list = (ArrayList<String>) objectMessage.getObject();
       assertEquals(list.get(0), "aString");
       connection.close();
    }
@@ -586,7 +613,7 @@ public class ProtonTest extends ProtonTestBase {
       fillAddress(destinationAddress);
 
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
-      AmqpConnection amqpConnection = amqpConnection = client.connect();
+      AmqpConnection amqpConnection = client.connect();
       try {
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(destinationAddress);
@@ -860,7 +887,7 @@ public class ProtonTest extends ProtonTestBase {
          AmqpMessage request = new AmqpMessage();
          request.setApplicationProperty("_AMQ_ResourceName", "core.server");
          request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
-         request.setApplicationProperty("JMSReplyTo", destinationAddress);
+         request.setReplyToAddress(destinationAddress);
          request.setText("[]");
 
          sender.send(request);


Mime
View raw message