activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438
Date Mon, 26 Sep 2016 22:45:17 GMT
https://issues.apache.org/jira/browse/AMQ-6438

JMS Transformer performance improvements and bug fixes

Trim unnecessary code and improve overall performance of the JMS
Transformer codecs.  Remove legacy Qpid JMS client related code from the
transformer as these are no longer supported.  Fix outgoing message that
do not match the structure of the incoming message that created them such
as message with had only a body being sent out with Headers and message
Properties.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/63d62a71
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/63d62a71
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/63d62a71

Branch: refs/heads/master
Commit: 63d62a71f59ec485ac79e1ce40e98316d37ca14a
Parents: 5702ec8
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Sep 21 12:57:16 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Sep 26 18:43:06 2016 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeInboundTransformer.java   |  12 +-
 .../message/AMQPNativeOutboundTransformer.java  | 102 ++--
 .../amqp/message/AMQPRawInboundTransformer.java |  32 +-
 .../amqp/message/ActiveMQJMSVendor.java         | 398 ---------------
 .../amqp/message/AmqpMessageSupport.java        | 181 ++++++-
 .../amqp/message/AmqpWritableBuffer.java        | 164 +++++++
 .../amqp/message/AutoOutboundTransformer.java   |  26 +-
 .../amqp/message/InboundTransformer.java        | 163 +++----
 .../message/JMSMappingInboundTransformer.java   | 111 +++--
 .../message/JMSMappingOutboundTransformer.java  | 482 +++++++++++--------
 .../amqp/message/OutboundTransformer.java       |  50 +-
 .../transport/amqp/protocol/AmqpReceiver.java   |  11 +-
 .../transport/amqp/protocol/AmqpSender.java     |  12 +-
 .../transport/amqp/AmqpTransformerTest.java     |   6 -
 .../transport/amqp/JMSInteroperabilityTest.java |  43 ++
 .../transport/amqp/client/AmqpMessage.java      |   6 +-
 .../JMSMappingInboundTransformerTest.java       |  92 +---
 .../JMSMappingOutboundTransformerTest.java      | 384 ++++++++-------
 .../JMSTransformationSpeedComparisonTest.java   |  55 ++-
 19 files changed, 1161 insertions(+), 1169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
index 65cd657..b5429e6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
@@ -16,14 +16,10 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.Message;
+import org.apache.activemq.command.ActiveMQMessage;
 
 public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
-    public AMQPNativeInboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
-
     @Override
     public String getTransformerName() {
         return TRANSFORMER_NATIVE;
@@ -31,14 +27,14 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
     @Override
     public InboundTransformer getFallbackTransformer() {
-        return new AMQPRawInboundTransformer(getVendor());
+        return new AMQPRawInboundTransformer();
     }
 
     @Override
-    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
+    protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception {
         org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 
-        Message result = super.doTransform(amqpMessage);
+        ActiveMQMessage result = super.doTransform(amqpMessage);
 
         populateMessage(result, amqp);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index 620b79b..cbc3461 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -16,93 +16,73 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import java.nio.ByteBuffer;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody;
 
-import javax.jms.BytesMessage;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageFormatException;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.ProtonJMessage;
 
-public class AMQPNativeOutboundTransformer extends OutboundTransformer {
-
-    public AMQPNativeOutboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
+public class AMQPNativeOutboundTransformer implements OutboundTransformer {
 
     @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if (msg == null || !(msg instanceof BytesMessage)) {
-            return null;
-        }
-
-        try {
-            if (!msg.getBooleanProperty(prefixVendor + "NATIVE")) {
-                return null;
-            }
-        } catch (MessageFormatException e) {
+    public EncodedMessage transform(ActiveMQMessage message) throws Exception {
+        if (message == null || !(message instanceof ActiveMQBytesMessage)) {
             return null;
         }
 
-        return transform(this, (BytesMessage) msg);
+        return transform(this, (ActiveMQBytesMessage) message);
     }
 
-    static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+    static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message) throws JMSException {
         long messageFormat;
         try {
-            messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
+            messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
         } catch (MessageFormatException e) {
             return null;
         }
-        byte data[] = new byte[(int) msg.getBodyLength()];
-        int dataSize = data.length;
-        msg.readBytes(data);
-        msg.reset();
-
-        try {
-            int count = msg.getIntProperty("JMSXDeliveryCount");
-            if (count > 1) {
 
-                // decode...
-                ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
-                int offset = 0;
-                int len = data.length;
-                while (len > 0) {
-                    final int decoded = amqp.decode(data, offset, len);
-                    assert decoded > 0 : "Make progress decoding the message";
-                    offset += decoded;
-                    len -= decoded;
-                }
+        Binary encodedMessage = getBinaryFromMessageBody(message);
+        byte encodedData[] = encodedMessage.getArray();
+        int encodedSize = encodedMessage.getLength();
 
-                // Update the DeliveryCount header...
-                // The AMQP delivery-count field only includes prior failed delivery attempts,
-                // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
-                if (amqp.getHeader() == null) {
-                    amqp.setHeader(new Header());
-                }
+        int count = message.getRedeliveryCounter();
+        if (count >= 1) {
 
-                amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+            // decode...
+            ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+            int offset = 0;
+            int len = encodedSize;
+            while (len > 0) {
+                final int decoded = amqp.decode(encodedData, offset, len);
+                assert decoded > 0 : "Make progress decoding the message";
+                offset += decoded;
+                len -= decoded;
+            }
 
-                // Re-encode...
-                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
-                final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-                int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-                if (overflow.position() > 0) {
-                    buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
-                    c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-                }
-                data = buffer.array();
-                dataSize = c;
+            // Update the DeliveryCount header...
+            // The AMQP delivery-count field only includes prior failed delivery attempts,
+            // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+            if (amqp.getHeader() == null) {
+                amqp.setHeader(new Header());
             }
-        } catch (JMSException e) {
+
+            amqp.getHeader().setDeliveryCount(new UnsignedInteger(count));
+
+            // Re-encode...
+            final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+            int written = amqp.encode(buffer);
+
+            encodedData = buffer.getArray();
+            encodedSize = written;
         }
 
-        return new EncodedMessage(messageFormat, data, 0, dataSize);
+        return new EncodedMessage(messageFormat, encodedData, 0, encodedSize);
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index c534709..b4d3ad6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -16,15 +16,16 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_NATIVE;
+
 import javax.jms.Message;
 
-public class AMQPRawInboundTransformer extends InboundTransformer {
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.util.ByteSequence;
 
-    public AMQPRawInboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
+public class AMQPRawInboundTransformer extends InboundTransformer {
 
     @Override
     public String getTransformerName() {
@@ -37,22 +38,23 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
     }
 
     @Override
-    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
-        BytesMessage result = vendor.createBytesMessage(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+    protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception {
+        ActiveMQBytesMessage result = new ActiveMQBytesMessage();
+        result.setContent(new ByteSequence(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()));
 
         // We cannot decode the message headers to check so err on the side of caution
         // and mark all messages as persistent.
-        result.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-        result.setJMSPriority(defaultPriority);
+        result.setPersistent(true);
+        result.setPriority((byte) Message.DEFAULT_PRIORITY);
 
         final long now = System.currentTimeMillis();
-        result.setJMSTimestamp(now);
-        if (defaultTtl > 0) {
-            result.setJMSExpiration(now + defaultTtl);
+        result.setTimestamp(now);
+
+        if (amqpMessage.getMessageFormat() != 0) {
+            result.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
         }
 
-        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        result.setBooleanProperty(prefixVendor + "NATIVE", true);
+        result.setBooleanProperty(JMS_AMQP_NATIVE, true);
 
         return result;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
deleted file mode 100644
index 9b5a4ab..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.message;
-
-import java.io.DataInputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.InflaterInputStream;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.transport.amqp.AmqpProtocolException;
-import org.apache.activemq.util.ByteArrayInputStream;
-import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.qpid.proton.amqp.Binary;
-
-public class ActiveMQJMSVendor {
-
-    final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
-
-    private ActiveMQJMSVendor() {
-    }
-
-    /**
-     * @return a new vendor specific Message instance.
-     */
-    public Message createMessage() {
-        return new ActiveMQMessage();
-    }
-
-    /**
-     * @return a new vendor specific BytesMessage instance.
-     */
-    public BytesMessage createBytesMessage() {
-        return new ActiveMQBytesMessage();
-    }
-
-    /**
-     * @return a new vendor specific BytesMessage instance with the given payload.
-     */
-    public BytesMessage createBytesMessage(byte[] content, int offset, int length) {
-        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
-        message.setContent(new ByteSequence(content, offset, length));
-        return message;
-    }
-
-    /**
-     * @return a new vendor specific StreamMessage instance.
-     */
-    public StreamMessage createStreamMessage() {
-        return new ActiveMQStreamMessage();
-    }
-
-    /**
-     * @return a new vendor specific TextMessage instance.
-     */
-    public TextMessage createTextMessage() {
-        return new ActiveMQTextMessage();
-    }
-
-    /**
-     * @return a new vendor specific TextMessage instance with the given string in the body.
-     */
-    public TextMessage createTextMessage(String text) {
-        ActiveMQTextMessage message = new ActiveMQTextMessage();
-        try {
-            message.setText(text);
-        } catch (MessageNotWriteableException ex) {}
-
-        return message;
-    }
-
-    /**
-     * @return a new vendor specific ObjectMessage instance.
-     */
-    public ObjectMessage createObjectMessage() {
-        return new ActiveMQObjectMessage();
-    }
-
-    /**
-     * @return a new vendor specific ObjectMessage instance with the serialized form given.
-     */
-    public ObjectMessage createObjectMessage(byte[] content, int offset, int length) {
-        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
-        message.setContent(new ByteSequence(content, offset, length));
-        return message;
-    }
-
-    /**
-     * @return a new vendor specific MapMessage instance.
-     */
-    public MapMessage createMapMessage() {
-        return new ActiveMQMapMessage();
-    }
-
-    /**
-     * @return a new vendor specific MapMessage instance with the given map as its content.
-     */
-    public MapMessage createMapMessage(Map<String, Object> content) throws JMSException {
-        ActiveMQMapMessage message = new ActiveMQMapMessage();
-        final Set<Map.Entry<String, Object>> set = content.entrySet();
-        for (Map.Entry<String, Object> entry : set) {
-            message.setObject(entry.getKey(), entry.getValue());
-        }
-        return message;
-    }
-
-    /**
-     * Creates a new JMS Destination instance from the given name.
-     *
-     * @param name
-     *      the name to use to construct the new Destination
-     *
-     * @return a new JMS Destination object derived from the given name.
-     */
-    public Destination createDestination(String name) {
-        return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
-    }
-
-    /**
-     * Set the given value as the JMSXUserID on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXUserID(Message msg, String value) {
-        ((ActiveMQMessage) msg).setUserID(value);
-    }
-
-    /**
-     * Set the given value as the JMSXGroupID on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXGroupID(Message msg, String value) {
-        ((ActiveMQMessage) msg).setGroupID(value);
-    }
-
-    /**
-     * Set the given value as the JMSXGroupSequence on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXGroupSequence(Message msg, int value) {
-        ((ActiveMQMessage) msg).setGroupSequence(value);
-    }
-
-    /**
-     * Set the given value as the JMSXDeliveryCount on the message instance.
-     *
-     * @param message
-     *      the message to be updated.
-     * @param value
-     *      the value to apply to the message.
-     */
-    public void setJMSXDeliveryCount(Message msg, long value) {
-        ((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
-    }
-
-    /**
-     * Convert the given JMS Destination into the appropriate AMQP address string
-     * for assignment to the 'to' or 'replyTo' field of an AMQP message.
-     *
-     * @param destination
-     *      the JMS Destination instance to be converted.
-     *
-     * @return the converted string address to assign to the message.
-     */
-    public String toAddress(Destination dest) {
-        return ((ActiveMQDestination) dest).getQualifiedName();
-    }
-
-    /**
-     * Given an Message instance return the original Message ID that was assigned the
-     * Message when it was first processed by the broker.  For an AMQP message this
-     * should be the original value of the message's MessageId field with the correct
-     * type preserved.
-     *
-     * @param message
-     *      the message which is being accessed.
-     *
-     * @return the original MessageId assigned to this Message instance.
-     */
-    public Object getOriginalMessageId(Message message) {
-        Object result;
-        MessageId msgId = ((ActiveMQMessage)message).getMessageId();
-        if (msgId.getTextView() != null) {
-            try {
-                result = AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView());
-            } catch (AmqpProtocolException e) {
-                result = msgId.getTextView().toString();
-            }
-        } else {
-            result = msgId.toString();
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
-     *
-     * @param message
-     *      the Message whose binary encoded body is needed.
-     *
-     * @return a Binary instance containing the encoded message body.
-     *
-     * @throws JMSException if an error occurs while fetching the binary payload.
-     */
-    public Binary getBinaryFromMessageBody(BytesMessage message) throws JMSException {
-        ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) message;
-        Binary result = null;
-
-        if (bytesMessage.getContent() != null) {
-            ByteSequence contents = bytesMessage.getContent();
-
-            if (bytesMessage.isCompressed()) {
-                int length = (int) bytesMessage.getBodyLength();
-                byte[] uncompressed = new byte[length];
-                bytesMessage.readBytes(uncompressed);
-
-                result = new Binary(uncompressed);
-            } else {
-                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
-            }
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
-     *
-     * @param message
-     *      the Message whose binary encoded body is needed.
-     *
-     * @return a Binary instance containing the encoded message body.
-     *
-     * @throws JMSException if an error occurs while fetching the binary payload.
-     */
-    public Binary getBinaryFromMessageBody(ObjectMessage message) throws JMSException {
-        ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) message;
-        Binary result = null;
-
-        if (objectMessage.getContent() != null) {
-            ByteSequence contents = objectMessage.getContent();
-
-            if (objectMessage.isCompressed()) {
-                try (ByteArrayOutputStream os = new ByteArrayOutputStream();
-                     ByteArrayInputStream is = new ByteArrayInputStream(contents);
-                     InflaterInputStream iis = new InflaterInputStream(is);) {
-
-                    byte value;
-                    while ((value = (byte) iis.read()) != -1) {
-                        os.write(value);
-                    }
-
-                    ByteSequence expanded = os.toByteSequence();
-                    result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength());
-                } catch (Exception cause) {
-                   throw JMSExceptionSupport.create(cause);
-               }
-            } else {
-                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
-            }
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the encoded form of the Message as an AMQP Binary instance.
-     *
-     * @param message
-     *      the Message whose binary encoded body is needed.
-     *
-     * @return a Binary instance containing the encoded message body.
-     *
-     * @throws JMSException if an error occurs while fetching the binary payload.
-     */
-    public Binary getBinaryFromMessageBody(TextMessage message) throws JMSException {
-        ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
-        Binary result = null;
-
-        if (textMessage.getContent() != null) {
-            ByteSequence contents = textMessage.getContent();
-
-            if (textMessage.isCompressed()) {
-                try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
-                     InflaterInputStream iis = new InflaterInputStream(is);
-                     DataInputStream dis = new DataInputStream(iis);) {
-
-                    int size = dis.readInt();
-                    byte[] uncompressed = new byte[size];
-                    dis.readFully(uncompressed);
-
-                    result = new Binary(uncompressed);
-                } catch (Exception cause) {
-                    throw JMSExceptionSupport.create(cause);
-                }
-            } else {
-                // Message includes a size prefix of four bytes for the OpenWire marshaler
-                result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4);
-            }
-        } else if (textMessage.getText() != null) {
-            result = new Binary(textMessage.getText().getBytes(StandardCharsets.UTF_8));
-        }
-
-        return result;
-    }
-
-    /**
-     * Return the underlying Map from the JMS MapMessage instance.
-     *
-     * @param message
-     *      the MapMessage whose underlying Map is requested.
-     *
-     * @return the underlying Map used to store the value in the given MapMessage.
-     *
-     * @throws JMSException if an error occurs in constructing or fetching the Map.
-     */
-    public Map<String, Object> getMapFromMessageBody(MapMessage message) throws JMSException {
-        final HashMap<String, Object> map = new HashMap<String, Object>();
-        final ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message;
-
-        final Map<String, Object> contentMap = mapMessage.getContentMap();
-        if (contentMap != null) {
-            map.putAll(contentMap);
-        }
-
-        return contentMap;
-    }
-
-    /**
-     * Sets the given Message Property on the given message overriding any read-only
-     * state on the Message long enough for the property to be added.
-     *
-     * @param message
-     *      the message to set the property on.
-     * @param key
-     *      the String key for the new Message property
-     * @param value
-     *      the Object to assign to the new Message property.
-     *
-     * @throws JMSException if an error occurs while setting the property.
-     */
-    public void setMessageProperty(Message message, String key, Object value) throws JMSException {
-        final ActiveMQMessage amqMessage = (ActiveMQMessage) message;
-
-        boolean oldValue = amqMessage.isReadOnlyProperties();
-
-        amqMessage.setReadOnlyProperties(false);
-        amqMessage.setObjectProperty(key, value);
-        amqMessage.setReadOnlyProperties(oldValue);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
index 3e7a60e..4f468ba 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
@@ -16,13 +16,26 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.InflaterInputStream;
 
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -34,12 +47,44 @@ import org.apache.qpid.proton.message.Message;
  */
 public final class AmqpMessageSupport {
 
+    // Message Properties used to map AMQP to JMS and back
+
+    public static final String JMS_AMQP_PREFIX = "JMS_AMQP_";
+    public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length();
+
+    public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+    public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING";
+    public static final String NATIVE = "NATIVE";
+    public static final String HEADER = "HEADER";
+    public static final String PROPERTIES = "PROPERTIES";
+
+    public static final String FIRST_ACQUIRER = "FirstAcquirer";
+    public static final String CONTENT_TYPE = "ContentType";
+    public static final String CONTENT_ENCODING = "ContentEncoding";
+    public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
+
+    public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
+    public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
+    public static final String FOOTER_PREFIX = "FT_";
+
+    public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
+    public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
+    public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
+    public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;
+    public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE;
+    public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER;
+    public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE;
+    public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING;
+    public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID;
+    public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
+    public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
+    public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
+
+    // Message body type definitions
     public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
     public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
     public static final Data NULL_OBJECT_BODY;
 
-    public static final String AMQP_ORIGINAL_ENCODING_KEY = "JMS_AMQP_ORIGINAL_ENCODING";
-
     public static final short AMQP_UNKNOWN = 0;
     public static final short AMQP_NULL = 1;
     public static final short AMQP_DATA = 2;
@@ -147,4 +192,134 @@ public final class AmqpMessageSupport {
             return baos.toByteArray();
         }
     }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public static Binary getBinaryFromMessageBody(ActiveMQBytesMessage message) throws JMSException {
+        Binary result = null;
+
+        if (message.getContent() != null) {
+            ByteSequence contents = message.getContent();
+
+            if (message.isCompressed()) {
+                int length = (int) message.getBodyLength();
+                byte[] uncompressed = new byte[length];
+                message.readBytes(uncompressed);
+
+                result = new Binary(uncompressed);
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public static Binary getBinaryFromMessageBody(ActiveMQObjectMessage message) throws JMSException {
+        Binary result = null;
+
+        if (message.getContent() != null) {
+            ByteSequence contents = message.getContent();
+
+            if (message.isCompressed()) {
+                try (ByteArrayOutputStream os = new ByteArrayOutputStream();
+                     ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);) {
+
+                    byte value;
+                    while ((value = (byte) iis.read()) != -1) {
+                        os.write(value);
+                    }
+
+                    ByteSequence expanded = os.toByteSequence();
+                    result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength());
+                } catch (Exception cause) {
+                   throw JMSExceptionSupport.create(cause);
+               }
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the Message as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public static Binary getBinaryFromMessageBody(ActiveMQTextMessage message) throws JMSException {
+        Binary result = null;
+
+        if (message.getContent() != null) {
+            ByteSequence contents = message.getContent();
+
+            if (message.isCompressed()) {
+                try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);
+                     DataInputStream dis = new DataInputStream(iis);) {
+
+                    int size = dis.readInt();
+                    byte[] uncompressed = new byte[size];
+                    dis.readFully(uncompressed);
+
+                    result = new Binary(uncompressed);
+                } catch (Exception cause) {
+                    throw JMSExceptionSupport.create(cause);
+                }
+            } else {
+                // Message includes a size prefix of four bytes for the OpenWire marshaler
+                result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4);
+            }
+        } else if (message.getText() != null) {
+            result = new Binary(message.getText().getBytes(StandardCharsets.UTF_8));
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the underlying Map from the JMS MapMessage instance.
+     *
+     * @param message
+     *      the MapMessage whose underlying Map is requested.
+     *
+     * @return the underlying Map used to store the value in the given MapMessage.
+     *
+     * @throws JMSException if an error occurs in constructing or fetching the Map.
+     */
+    public static Map<String, Object> getMapFromMessageBody(ActiveMQMapMessage message) throws JMSException {
+        final HashMap<String, Object> map = new HashMap<String, Object>();
+
+        final Map<String, Object> contentMap = message.getContentMap();
+        if (contentMap != null) {
+            map.putAll(contentMap);
+        }
+
+        return contentMap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
new file mode 100644
index 0000000..399eb3f
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+/**
+ *
+ */
+public class AmqpWritableBuffer implements WritableBuffer {
+
+    public final static int DEFAULT_CAPACITY = 4 * 1024;
+
+    byte buffer[];
+    int position;
+
+   /**
+    * Creates a new WritableBuffer with default capacity.
+    */
+   public AmqpWritableBuffer() {
+       this(DEFAULT_CAPACITY);
+   }
+
+    /**
+     * Create a new WritableBuffer with the given capacity.
+     */
+    public AmqpWritableBuffer(int capacity) {
+        this.buffer = new byte[capacity];
+    }
+
+    public byte[] getArray() {
+        return buffer;
+    }
+
+    public int getArrayLength() {
+        return position;
+    }
+
+    @Override
+    public void put(byte b) {
+        int newPosition = position + 1;
+        ensureCapacity(newPosition);
+        buffer[position] = b;
+        position = newPosition;
+    }
+
+    @Override
+    public void putShort(short value) {
+        ensureCapacity(position + 2);
+        buffer[position++] = (byte)(value >>> 8);
+        buffer[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putInt(int value) {
+        ensureCapacity(position + 4);
+        buffer[position++] = (byte)(value >>> 24);
+        buffer[position++] = (byte)(value >>> 16);
+        buffer[position++] = (byte)(value >>> 8);
+        buffer[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putLong(long value) {
+        ensureCapacity(position + 8);
+        buffer[position++] = (byte)(value >>> 56);
+        buffer[position++] = (byte)(value >>> 48);
+        buffer[position++] = (byte)(value >>> 40);
+        buffer[position++] = (byte)(value >>> 32);
+        buffer[position++] = (byte)(value >>> 24);
+        buffer[position++] = (byte)(value >>> 16);
+        buffer[position++] = (byte)(value >>> 8);
+        buffer[position++] = (byte)(value >>> 0);
+    }
+
+    @Override
+    public void putFloat(float value) {
+        putInt(Float.floatToRawIntBits(value));
+    }
+
+    @Override
+    public void putDouble(double value) {
+        putLong(Double.doubleToRawLongBits(value));
+    }
+
+    @Override
+    public void put(byte[] src, int offset, int length) {
+        if (length == 0) {
+            return;
+        }
+
+        int newPosition = position + length;
+        ensureCapacity(newPosition);
+        System.arraycopy(src, offset, buffer, position, length);
+        position = newPosition;
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return position < Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int remaining() {
+        return Integer.MAX_VALUE - position;
+    }
+
+    @Override
+    public int position() {
+        return position;
+    }
+
+    @Override
+    public void position(int position) {
+        ensureCapacity(position);
+        this.position = position;
+    }
+
+    @Override
+    public void put(ByteBuffer payload) {
+        int newPosition = position + payload.remaining();
+        ensureCapacity(newPosition);
+        while (payload.hasRemaining()) {
+            buffer[position++] = payload.get();
+        }
+
+        position = newPosition;
+    }
+
+    @Override
+    public int limit() {
+        return Integer.MAX_VALUE;
+    }
+
+    /**
+     * Ensures the the buffer has at least the minimumCapacity specified.
+     *
+     * @param minimumCapacity
+     *      the minimum capacity needed to meet the next write operation.
+     */
+    private void ensureCapacity(int minimumCapacity) {
+        if (minimumCapacity > buffer.length) {
+            byte newBuffer[] = new byte[Math.max(buffer.length << 1, minimumCapacity)];
+            System.arraycopy(buffer, 0, newBuffer, 0, position);
+            buffer = newBuffer;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
index f0f71a8..edfdecf 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
@@ -16,33 +16,31 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import javax.jms.BytesMessage;
-import javax.jms.Message;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_NATIVE;
 
-public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
+import javax.jms.BytesMessage;
 
-    private final JMSMappingOutboundTransformer transformer;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
 
-    public AutoOutboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
+public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
 
-        transformer = new JMSMappingOutboundTransformer(vendor);
-    }
+    private final JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
 
     @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if (msg == null) {
+    public EncodedMessage transform(ActiveMQMessage message) throws Exception {
+        if (message == null) {
             return null;
         }
 
-        if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
-            if (msg instanceof BytesMessage) {
-                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage) msg);
+        if (message.getBooleanProperty(JMS_AMQP_NATIVE)) {
+            if (message instanceof BytesMessage) {
+                return AMQPNativeOutboundTransformer.transform(this, (ActiveMQBytesMessage) message);
             } else {
                 return null;
             }
         } else {
-            return transformer.transform(msg);
+            return transformer.transform(message);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index e6b7a0f..323a9c1 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -16,14 +16,25 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PROPERTIES;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
@@ -42,32 +53,17 @@ import org.apache.qpid.proton.amqp.messaging.Properties;
 
 public abstract class InboundTransformer {
 
-    protected final ActiveMQJMSVendor vendor;
-
     public static final String TRANSFORMER_NATIVE = "native";
     public static final String TRANSFORMER_RAW = "raw";
     public static final String TRANSFORMER_JMS = "jms";
 
-    protected String prefixVendor = "JMS_AMQP_";
-    protected String prefixDeliveryAnnotations = "DA_";
-    protected String prefixMessageAnnotations = "MA_";
-    protected String prefixFooter = "FT_";
-
-    protected int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
-    protected int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
-    protected long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
-
-    public InboundTransformer(ActiveMQJMSVendor vendor) {
-        this.vendor = vendor;
-    }
-
     public abstract String getTransformerName();
 
     public abstract InboundTransformer getFallbackTransformer();
 
-    public final Message transform(EncodedMessage amqpMessage) throws Exception {
+    public final ActiveMQMessage transform(EncodedMessage amqpMessage) throws Exception {
         InboundTransformer transformer = this;
-        Message message = null;
+        ActiveMQMessage message = null;
 
         while (transformer != null) {
             try {
@@ -85,79 +81,40 @@ public abstract class InboundTransformer {
         return message;
     }
 
-    protected abstract Message doTransform(EncodedMessage amqpMessage) throws Exception;
-
-    public int getDefaultDeliveryMode() {
-        return defaultDeliveryMode;
-    }
-
-    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
-        this.defaultDeliveryMode = defaultDeliveryMode;
-    }
-
-    public int getDefaultPriority() {
-        return defaultPriority;
-    }
-
-    public void setDefaultPriority(int defaultPriority) {
-        this.defaultPriority = defaultPriority;
-    }
-
-    public long getDefaultTtl() {
-        return defaultTtl;
-    }
-
-    public void setDefaultTtl(long defaultTtl) {
-        this.defaultTtl = defaultTtl;
-    }
-
-    public String getPrefixVendor() {
-        return prefixVendor;
-    }
-
-    public void setPrefixVendor(String prefixVendor) {
-        this.prefixVendor = prefixVendor;
-    }
-
-    public ActiveMQJMSVendor getVendor() {
-        return vendor;
-    }
+    protected abstract ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception;
 
     @SuppressWarnings("unchecked")
-    protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+    protected void populateMessage(ActiveMQMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
         Header header = amqp.getHeader();
-        if (header == null) {
-            header = new Header();
-        }
+        if (header != null) {
+            jms.setBooleanProperty(JMS_AMQP_HEADER, true);
 
-        if (header.getDurable() != null) {
-            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-        } else {
-            jms.setJMSDeliveryMode(defaultDeliveryMode);
-        }
+            if (header.getDurable() != null) {
+                jms.setPersistent(header.getDurable().booleanValue());
+            }
 
-        if (header.getPriority() != null) {
-            jms.setJMSPriority(header.getPriority().intValue());
-        } else {
-            jms.setJMSPriority(defaultPriority);
-        }
+            if (header.getPriority() != null) {
+                jms.setJMSPriority(header.getPriority().intValue());
+            } else {
+                jms.setPriority((byte) Message.DEFAULT_PRIORITY);
+            }
 
-        if (header.getFirstAcquirer() != null) {
-            jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
-        }
+            if (header.getFirstAcquirer() != null) {
+                jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
+            }
 
-        if (header.getDeliveryCount() != null) {
-            vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+            if (header.getDeliveryCount() != null) {
+                jms.setRedeliveryCounter(header.getDeliveryCount().intValue());
+            }
+        } else {
+            jms.setPriority((byte) Message.DEFAULT_PRIORITY);
         }
 
         final MessageAnnotations ma = amqp.getMessageAnnotations();
         if (ma != null) {
             for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
                 String key = entry.getKey().toString();
-                if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
-                    // Legacy annotation, JMSType value will be replaced by Subject further down if also present.
-                    jms.setJMSType(entry.getValue().toString());
-                } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+                if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
                     long deliveryTime = ((Number) entry.getValue()).longValue();
                     long delay = deliveryTime - System.currentTimeMillis();
                     if (delay > 0) {
@@ -185,82 +142,72 @@ public abstract class InboundTransformer {
                     }
                 }
 
-                setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+                setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
             }
         }
 
         final ApplicationProperties ap = amqp.getApplicationProperties();
         if (ap != null) {
             for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                if ("JMSXGroupID".equals(key)) {
-                    vendor.setJMSXGroupID(jms, entry.getValue().toString());
-                } else if ("JMSXGroupSequence".equals(key)) {
-                    vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
-                } else if ("JMSXUserID".equals(key)) {
-                    vendor.setJMSXUserID(jms, entry.getValue().toString());
-                } else {
-                    setProperty(jms, key, entry.getValue());
-                }
+                setProperty(jms,  entry.getKey().toString(), entry.getValue());
             }
         }
 
         final Properties properties = amqp.getProperties();
         if (properties != null) {
+            jms.setBooleanProperty(JMS_AMQP_PROPERTIES, true);
             if (properties.getMessageId() != null) {
                 jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
             }
             Binary userId = properties.getUserId();
             if (userId != null) {
-                vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+                jms.setUserID(new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
             }
             if (properties.getTo() != null) {
-                jms.setJMSDestination(vendor.createDestination(properties.getTo()));
+                jms.setDestination((ActiveMQDestination.createDestination(properties.getTo(), ActiveMQDestination.QUEUE_TYPE)));
             }
             if (properties.getSubject() != null) {
-                jms.setJMSType(properties.getSubject());
+                jms.setType(properties.getSubject());
             }
             if (properties.getReplyTo() != null) {
-                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+                jms.setReplyTo((ActiveMQDestination.createDestination(properties.getReplyTo(), ActiveMQDestination.QUEUE_TYPE)));
             }
             if (properties.getCorrelationId() != null) {
-                jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
+                jms.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
             }
             if (properties.getContentType() != null) {
-                jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+                jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
             }
             if (properties.getContentEncoding() != null) {
-                jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+                jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
             }
             if (properties.getCreationTime() != null) {
-                jms.setJMSTimestamp(properties.getCreationTime().getTime());
+                jms.setTimestamp(properties.getCreationTime().getTime());
             }
             if (properties.getGroupId() != null) {
-                vendor.setJMSXGroupID(jms, properties.getGroupId());
+                jms.setGroupID(properties.getGroupId());
             }
             if (properties.getGroupSequence() != null) {
-                vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+                jms.setGroupSequence(properties.getGroupSequence().intValue());
             }
             if (properties.getReplyToGroupId() != null) {
-                jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+                jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
             }
             if (properties.getAbsoluteExpiryTime() != null) {
-                jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+                jms.setExpiration(properties.getAbsoluteExpiryTime().getTime());
             }
         }
 
         // If the jms expiration has not yet been set...
-        if (jms.getJMSExpiration() == 0) {
+        if (header != null && jms.getJMSExpiration() == 0) {
             // Then lets try to set it based on the message ttl.
-            long ttl = defaultTtl;
+            long ttl = Message.DEFAULT_TIME_TO_LIVE;
             if (header.getTtl() != null) {
                 ttl = header.getTtl().longValue();
             }
 
-            if (ttl == 0) {
-                jms.setJMSExpiration(0);
-            } else {
-                jms.setJMSExpiration(System.currentTimeMillis() + ttl);
+            if (ttl != javax.jms.Message.DEFAULT_TIME_TO_LIVE) {
+                jms.setExpiration(System.currentTimeMillis() + ttl);
             }
         }
 
@@ -268,7 +215,7 @@ public abstract class InboundTransformer {
         if (fp != null) {
             for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
                 String key = entry.getKey().toString();
-                setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+                setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/63d62a71/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
index 707e5da..79e4c2c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -18,13 +18,14 @@ package org.apache.activemq.transport.amqp.message;
 
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
-import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_MAP;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_NULL;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
 import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getCharsetForTextualContent;
@@ -37,10 +38,19 @@ import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import javax.jms.StreamMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -50,10 +60,6 @@ import org.apache.qpid.proton.message.Message;
 
 public class JMSMappingInboundTransformer extends InboundTransformer {
 
-    public JMSMappingInboundTransformer(ActiveMQJMSVendor vendor) {
-        super(vendor);
-    }
-
     @Override
     public String getTransformerName() {
         return TRANSFORMER_JMS;
@@ -61,55 +67,52 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
 
     @Override
     public InboundTransformer getFallbackTransformer() {
-        return new AMQPNativeInboundTransformer(getVendor());
+        return new AMQPNativeInboundTransformer();
     }
 
     @Override
-    protected javax.jms.Message doTransform(EncodedMessage amqpMessage) throws Exception {
+    protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception {
         Message amqp = amqpMessage.decode();
 
-        javax.jms.Message result = createMessage(amqp, amqpMessage);
-
-        result.setJMSDeliveryMode(defaultDeliveryMode);
-        result.setJMSPriority(defaultPriority);
-        result.setJMSExpiration(defaultTtl);
+        ActiveMQMessage result = createMessage(amqp, amqpMessage);
 
         populateMessage(result, amqp);
 
-        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        result.setBooleanProperty(prefixVendor + "NATIVE", false);
+        if (amqpMessage.getMessageFormat() != 0) {
+            result.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
+        }
 
         return result;
     }
 
     @SuppressWarnings({ "unchecked" })
-    private javax.jms.Message createMessage(Message message, EncodedMessage original) throws Exception {
+    private ActiveMQMessage createMessage(Message message, EncodedMessage original) throws Exception {
 
         Section body = message.getBody();
-        javax.jms.Message result;
+        ActiveMQMessage result;
 
         if (body == null) {
             if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
-                result = vendor.createObjectMessage();
+                result = new ActiveMQObjectMessage();
             } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
-                result = vendor.createBytesMessage();
+                result = new ActiveMQBytesMessage();
             } else {
                 Charset charset = getCharsetForTextualContent(message.getContentType());
                 if (charset != null) {
-                    result = vendor.createTextMessage();
+                    result = new ActiveMQTextMessage();
                 } else {
-                    result = vendor.createMessage();
+                    result = new ActiveMQMessage();
                 }
             }
 
-            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
         } else if (body instanceof Data) {
             Binary payload = ((Data) body).getValue();
 
             if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
-                result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                result = createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
             } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
-                result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
             } else {
                 Charset charset = getCharsetForTextualContent(message.getContentType());
                 if (StandardCharsets.UTF_8.equals(charset)) {
@@ -117,51 +120,51 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
 
                     try {
                         CharBuffer chars = charset.newDecoder().decode(buf);
-                        result = vendor.createTextMessage(String.valueOf(chars));
+                        result = createTextMessage(String.valueOf(chars));
                     } catch (CharacterCodingException e) {
-                        result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                        result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                     }
                 } else {
-                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                 }
             }
 
-            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
         } else if (body instanceof AmqpSequence) {
             AmqpSequence sequence = (AmqpSequence) body;
-            StreamMessage m = vendor.createStreamMessage();
+            ActiveMQStreamMessage m = new ActiveMQStreamMessage();
             for (Object item : sequence.getValue()) {
                 m.writeObject(item);
             }
 
             result = m;
-            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
         } else if (body instanceof AmqpValue) {
             Object value = ((AmqpValue) body).getValue();
             if (value == null || value instanceof String) {
-                result = vendor.createTextMessage((String) value);
+                result = createTextMessage((String) value);
 
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
             } else if (value instanceof Binary) {
                 Binary payload = (Binary) value;
 
                 if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
-                    result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    result = createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                 } else {
-                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
                 }
 
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
             } else if (value instanceof List) {
-                StreamMessage m = vendor.createStreamMessage();
+                ActiveMQStreamMessage m = new ActiveMQStreamMessage();
                 for (Object item : (List<Object>) value) {
                     m.writeObject(item);
                 }
                 result = m;
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_LIST);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
             } else if (value instanceof Map) {
-                result = vendor.createMapMessage((Map<String, Object>) value);
-                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_MAP);
+                result = createMapMessage((Map<String, Object>) value);
+                result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
             } else {
                 // Trigger fall-back to native encoder which generates BytesMessage with the
                 // original message stored in the message body.
@@ -173,4 +176,34 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
 
         return result;
     }
+
+    private static ActiveMQBytesMessage createBytesMessage(byte[] content, int offset, int length) {
+        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
+    }
+
+    public static ActiveMQTextMessage createTextMessage(String text) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        try {
+            message.setText(text);
+        } catch (MessageNotWriteableException ex) {}
+
+        return message;
+    }
+
+    public static ActiveMQObjectMessage createObjectMessage(byte[] content, int offset, int length) {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
+    }
+
+    public static ActiveMQMapMessage createMapMessage(Map<String, Object> content) throws JMSException {
+        ActiveMQMapMessage message = new ActiveMQMapMessage();
+        final Set<Map.Entry<String, Object>> set = content.entrySet();
+        for (Map.Entry<String, Object> entry : set) {
+            message.setObject(entry.getKey(), entry.getValue());
+        }
+        return message;
+    }
 }


Mime
View raw message