activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6374
Date Mon, 25 Jul 2016 22:16:11 GMT
https://issues.apache.org/jira/browse/AMQ-6374

Refactor transformer to better map AMQP messages to JMS message types
and better preserve the original encoding of stored messages so that
they can be sent back to an AMQP client with expected content types.
Adds additional interoperability tests. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d54e21b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d54e21b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d54e21b2

Branch: refs/heads/master
Commit: d54e21b2ff563431919e8783f701c889497b2101
Parents: 3953b9a
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Jul 25 18:15:53 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Jul 25 18:15:53 2016 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeInboundTransformer.java   |  13 +-
 .../message/AMQPNativeOutboundTransformer.java  |   4 +-
 .../amqp/message/AMQPRawInboundTransformer.java |  25 +-
 .../amqp/message/ActiveMQJMSVendor.java         | 328 +++++++-
 .../amqp/message/AmqpContentTypeSupport.java    | 145 ++++
 .../amqp/message/AmqpMessageSupport.java        | 150 ++++
 .../amqp/message/AutoOutboundTransformer.java   |  15 +-
 .../transport/amqp/message/EncodedMessage.java  |   4 +-
 .../amqp/message/InboundTransformer.java        |  49 +-
 .../message/InvalidContentTypeException.java    |  26 +
 .../message/JMSMappingInboundTransformer.java   | 155 ++--
 .../message/JMSMappingOutboundTransformer.java  | 263 ++++---
 .../transport/amqp/message/JMSVendor.java       |  53 --
 .../amqp/message/OutboundTransformer.java       |  46 +-
 .../transport/amqp/protocol/AmqpReceiver.java   |  18 +-
 .../transport/amqp/JMSInteroperabilityTest.java | 127 +++-
 .../message/AmqpContentTypeSupportTest.java     | 229 ++++++
 .../amqp/message/AmqpMessageSupportTest.java    | 108 +++
 .../JMSMappingInboundTransformerTest.java       | 555 +++++++++++++-
 .../JMSMappingOutboundTransformerTest.java      | 743 ++++++++++++++++++-
 .../JMSTransformationSpeedComparisonTest.java   | 312 ++++++++
 21 files changed, 3017 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
index a28b301..65cd657 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,7 +20,7 @@ import javax.jms.Message;
 
 public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
-    public AMQPNativeInboundTransformer(JMSVendor vendor) {
+    public AMQPNativeInboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -35,12 +35,13 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
     }
 
     @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
+    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
         org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 
-        Message rc = super.transform(amqpMessage);
+        Message result = super.doTransform(amqpMessage);
 
-        populateMessage(rc, amqp);
-        return rc;
+        populateMessage(result, amqp);
+
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index c1dc976..620b79b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -32,7 +32,7 @@ import org.apache.qpid.proton.message.ProtonJMessage;
 
 public class AMQPNativeOutboundTransformer extends OutboundTransformer {
 
-    public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+    public AMQPNativeOutboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index e1414df..c534709 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,7 +22,7 @@ import javax.jms.Message;
 
 public class AMQPRawInboundTransformer extends InboundTransformer {
 
-    public AMQPRawInboundTransformer(JMSVendor vendor) {
+    public AMQPRawInboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -33,28 +33,27 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
 
     @Override
     public InboundTransformer getFallbackTransformer() {
-        return null;  // No fallback from full raw transform
+        return null;  // No fallback from full raw transform, message likely dropped.
     }
 
     @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
-        BytesMessage rc = vendor.createBytesMessage();
-        rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+    protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
+        BytesMessage result = vendor.createBytesMessage(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
 
         // We cannot decode the message headers to check so err on the side of caution
         // and mark all messages as persistent.
-        rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-        rc.setJMSPriority(defaultPriority);
+        result.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+        result.setJMSPriority(defaultPriority);
 
         final long now = System.currentTimeMillis();
-        rc.setJMSTimestamp(now);
+        result.setJMSTimestamp(now);
         if (defaultTtl > 0) {
-            rc.setJMSExpiration(now + defaultTtl);
+            result.setJMSExpiration(now + defaultTtl);
         }
 
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+        result.setBooleanProperty(prefixVendor + "NATIVE", true);
 
-        return rc;
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
index 216daa9..efd5017 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
@@ -16,10 +16,18 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import java.io.DataInputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.InflaterInputStream;
+
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.MessageNotWriteableException;
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
@@ -31,71 +39,357 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
 
-public class ActiveMQJMSVendor implements JMSVendor {
+public class ActiveMQJMSVendor {
 
     final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
 
     private ActiveMQJMSVendor() {
     }
 
-    @Override
+    /**
+     * @return a new vendor specific Message instance.
+     */
+    public Message createMessage() {
+        return new ActiveMQMessage();
+    }
+
+    /**
+     * @return a new vendor specific BytesMessage instance.
+     */
     public BytesMessage createBytesMessage() {
         return new ActiveMQBytesMessage();
     }
 
-    @Override
-    public StreamMessage createStreamMessage() {
-        return new ActiveMQStreamMessage();
+    /**
+     * @return a new vendor specific BytesMessage instance with the given payload.
+     */
+    public BytesMessage createBytesMessage(byte[] content, int offset, int length) {
+        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
     }
 
-    @Override
-    public Message createMessage() {
-        return new ActiveMQMessage();
+    /**
+     * @return a new vendor specific StreamMessage instance.
+     */
+    public StreamMessage createStreamMessage() {
+        return new ActiveMQStreamMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific TextMessage instance.
+     */
     public TextMessage createTextMessage() {
         return new ActiveMQTextMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific TextMessage instance with the given string in the body.
+     */
+    public TextMessage createTextMessage(String text) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        try {
+            message.setText(text);
+        } catch (MessageNotWriteableException ex) {}
+
+        return message;
+    }
+
+    /**
+     * @return a new vendor specific ObjectMessage instance.
+     */
     public ObjectMessage createObjectMessage() {
         return new ActiveMQObjectMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific ObjectMessage instance with the serialized form given.
+     */
+    public ObjectMessage createObjectMessage(byte[] content, int offset, int length) {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setContent(new ByteSequence(content, offset, length));
+        return message;
+    }
+
+    /**
+     * @return a new vendor specific MapMessage instance.
+     */
     public MapMessage createMapMessage() {
         return new ActiveMQMapMessage();
     }
 
-    @Override
+    /**
+     * @return a new vendor specific MapMessage instance with the given map as its content.
+     */
+    public MapMessage createMapMessage(Map<String, Object> content) throws JMSException {
+        ActiveMQMapMessage message = new ActiveMQMapMessage();
+        final Set<Map.Entry<String, Object>> set = content.entrySet();
+        for (Map.Entry<String, Object> entry : set) {
+            message.setObject(entry.getKey(), entry.getValue());
+        }
+        return message;
+    }
+
+    /**
+     * Creates a new JMS Destination instance from the given name.
+     *
+     * @param name
+     *      the name to use to construct the new Destination
+     *
+     * @return a new JMS Destination object derived from the given name.
+     */
     public Destination createDestination(String name) {
         return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXUserID on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXUserID(Message msg, String value) {
         ((ActiveMQMessage) msg).setUserID(value);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXGroupID on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXGroupID(Message msg, String value) {
         ((ActiveMQMessage) msg).setGroupID(value);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXGroupSequence on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXGroupSequence(Message msg, int value) {
         ((ActiveMQMessage) msg).setGroupSequence(value);
     }
 
-    @Override
+    /**
+     * Set the given value as the JMSXDeliveryCount on the message instance.
+     *
+     * @param message
+     *      the message to be updated.
+     * @param value
+     *      the value to apply to the message.
+     */
     public void setJMSXDeliveryCount(Message msg, long value) {
         ((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
     }
 
-    @Override
+    /**
+     * Convert the given JMS Destination into the appropriate AMQP address string
+     * for assignment to the 'to' or 'replyTo' field of an AMQP message.
+     *
+     * @param destination
+     *      the JMS Destination instance to be converted.
+     *
+     * @return the converted string address to assign to the message.
+     */
     public String toAddress(Destination dest) {
         return ((ActiveMQDestination) dest).getQualifiedName();
     }
+
+    /**
+     * Given an Message instance return the original Message ID that was assigned the
+     * Message when it was first processed by the broker.  For an AMQP message this
+     * should be the original value of the message's MessageId field with the correct
+     * type preserved.
+     *
+     * @param message
+     *      the message which is being accessed.
+     *
+     * @return the original MessageId assigned to this Message instance.
+     */
+    public Object getOriginalMessageId(Message message) {
+        Object result;
+        MessageId msgId = ((ActiveMQMessage)message).getMessageId();
+        if (msgId.getTextView() != null) {
+            try {
+                result = AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView());
+            } catch (AmqpProtocolException e) {
+                result = msgId.getTextView().toString();
+            }
+        } else {
+            result = msgId.toString();
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public Binary getBinaryFromMessageBody(BytesMessage message) throws JMSException {
+        ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) message;
+        Binary result = null;
+
+        if (bytesMessage.getContent() != null) {
+            ByteSequence contents = bytesMessage.getContent();
+
+            if (bytesMessage.isCompressed()) {
+                int length = (int) bytesMessage.getBodyLength();
+                byte[] uncompressed = new byte[length];
+                bytesMessage.readBytes(uncompressed);
+
+                result = new Binary(uncompressed);
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the BytesMessage as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public Binary getBinaryFromMessageBody(ObjectMessage message) throws JMSException {
+        ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) message;
+        Binary result = null;
+
+        if (objectMessage.getContent() != null) {
+            ByteSequence contents = objectMessage.getContent();
+
+            if (objectMessage.isCompressed()) {
+                try (ByteArrayOutputStream os = new ByteArrayOutputStream();
+                     ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);) {
+
+                    byte value;
+                    while ((value = (byte) iis.read()) != -1) {
+                        os.write(value);
+                    }
+
+                    ByteSequence expanded = os.toByteSequence();
+                    result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength());
+                } catch (Exception cause) {
+                   throw JMSExceptionSupport.create(cause);
+               }
+            } else {
+                return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the encoded form of the Message as an AMQP Binary instance.
+     *
+     * @param message
+     *      the Message whose binary encoded body is needed.
+     *
+     * @return a Binary instance containing the encoded message body.
+     *
+     * @throws JMSException if an error occurs while fetching the binary payload.
+     */
+    public Binary getBinaryFromMessageBody(TextMessage message) throws JMSException {
+        ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
+        Binary result = null;
+
+        if (textMessage.getContent() != null) {
+            ByteSequence contents = textMessage.getContent();
+
+            if (textMessage.isCompressed()) {
+                try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
+                     InflaterInputStream iis = new InflaterInputStream(is);
+                     DataInputStream dis = new DataInputStream(iis);) {
+
+                    int size = dis.readInt();
+                    byte[] uncompressed = new byte[size];
+                    dis.readFully(uncompressed);
+
+                    result = new Binary(uncompressed);
+                } catch (Exception cause) {
+                    throw JMSExceptionSupport.create(cause);
+                }
+            } else {
+                // Message includes a size prefix of four bytes for the OpenWire marshaler
+                result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4);
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the underlying Map from the JMS MapMessage instance.
+     *
+     * @param message
+     *      the MapMessage whose underlying Map is requested.
+     *
+     * @return the underlying Map used to store the value in the given MapMessage.
+     *
+     * @throws JMSException if an error occurs in constructing or fetching the Map.
+     */
+    public Map<String, Object> getMapFromMessageBody(MapMessage message) throws JMSException {
+        final HashMap<String, Object> map = new HashMap<String, Object>();
+        final ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message;
+
+        final Map<String, Object> contentMap = mapMessage.getContentMap();
+        if (contentMap != null) {
+            map.putAll(contentMap);
+        }
+
+        return contentMap;
+    }
+
+    /**
+     * Sets the given Message Property on the given message overriding any read-only
+     * state on the Message long enough for the property to be added.
+     *
+     * @param message
+     *      the message to set the property on.
+     * @param key
+     *      the String key for the new Message property
+     * @param value
+     *      the Object to assign to the new Message property.
+     *
+     * @throws JMSException if an error occurs while setting the property.
+     */
+    public void setMessageProperty(Message message, String key, Object value) throws JMSException {
+        final ActiveMQMessage amqMessage = (ActiveMQMessage) message;
+
+        boolean oldValue = amqMessage.isReadOnlyProperties();
+
+        amqMessage.setReadOnlyProperties(false);
+        amqMessage.setObjectProperty(key, value);
+        amqMessage.setReadOnlyProperties(oldValue);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java
new file mode 100644
index 0000000..114ade7
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpContentTypeSupport.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.StandardCharsets;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.StringTokenizer;
+
+public final class AmqpContentTypeSupport {
+
+    private static final String UTF_8 = "UTF-8";
+    private static final String CHARSET = "charset";
+    private static final String TEXT = "text";
+    private static final String APPLICATION = "application";
+    private static final String JAVASCRIPT = "javascript";
+    private static final String XML = "xml";
+    private static final String XML_VARIANT = "+xml";
+    private static final String JSON = "json";
+    private static final String JSON_VARIANT = "+json";
+    private static final String XML_DTD = "xml-dtd";
+    private static final String ECMASCRIPT = "ecmascript";
+
+    /**
+     * @param contentType
+     *        the contentType of the received message
+     * @return the character set to use, or null if not to treat the message as
+     *         text
+     * @throws InvalidContentTypeException
+     *         if the content-type is invalid in some way.
+     */
+    public static Charset parseContentTypeForTextualCharset(final String contentType) throws InvalidContentTypeException {
+        if (contentType == null || contentType.trim().isEmpty()) {
+            throw new InvalidContentTypeException("Content type can't be null or empty");
+        }
+
+        int subTypeSeparator = contentType.indexOf("/");
+        if (subTypeSeparator == -1) {
+            throw new InvalidContentTypeException("Content type has no '/' separator: " + contentType);
+        }
+
+        final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
+
+        String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+
+        String parameterPart = null;
+        int parameterSeparator = subTypePart.indexOf(";");
+        if (parameterSeparator != -1) {
+            if (parameterSeparator < subTypePart.length() - 1) {
+                parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+            }
+            subTypePart = subTypePart.substring(0, parameterSeparator).trim();
+        }
+
+        if (subTypePart.isEmpty()) {
+            throw new InvalidContentTypeException("Content type has no subtype after '/'" + contentType);
+        }
+
+        final String subType = subTypePart;
+
+        if (isTextual(type, subType)) {
+            String charset = findCharset(parameterPart);
+            if (charset == null) {
+                charset = UTF_8;
+            }
+
+            if (UTF_8.equals(charset)) {
+                return StandardCharsets.UTF_8;
+            } else {
+                try {
+                    return Charset.forName(charset);
+                } catch (IllegalCharsetNameException icne) {
+                    throw new InvalidContentTypeException("Illegal charset: " + charset);
+                } catch (UnsupportedCharsetException uce) {
+                    throw new InvalidContentTypeException("Unsupported charset: " + charset);
+                }
+            }
+        }
+
+        return null;
+    }
+
+    //----- Internal Content Type utilities ----------------------------------//
+
+    private static boolean isTextual(String type, String subType) {
+        if (TEXT.equals(type)) {
+            return true;
+        }
+
+        if (APPLICATION.equals(type)) {
+            if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT)
+                || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static String findCharset(String paramaterPart) {
+        String charset = null;
+
+        if (paramaterPart != null) {
+            StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";");
+            while (tokenizer.hasMoreTokens()) {
+                String parameter = tokenizer.nextToken().trim();
+                int eqIndex = parameter.indexOf('=');
+                if (eqIndex != -1) {
+                    String name = parameter.substring(0, eqIndex);
+                    if (CHARSET.equalsIgnoreCase(name.trim())) {
+                        String value = unquote(parameter.substring(eqIndex + 1));
+
+                        charset = value.toUpperCase();
+                        break;
+                    }
+                }
+            }
+        }
+
+        return charset;
+    }
+
+    private static String unquote(String s) {
+        if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) {
+            return s.substring(1, s.length() - 1);
+        } else {
+            return s;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
new file mode 100644
index 0000000..3e7a60e
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Support class containing constant values and static methods that are
+ * used to map to / from AMQP Message types being sent or received.
+ */
+public final class AmqpMessageSupport {
+
+    public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
+    public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
+    public static final Data NULL_OBJECT_BODY;
+
+    public static final String AMQP_ORIGINAL_ENCODING_KEY = "JMS_AMQP_ORIGINAL_ENCODING";
+
+    public static final short AMQP_UNKNOWN = 0;
+    public static final short AMQP_NULL = 1;
+    public static final short AMQP_DATA = 2;
+    public static final short AMQP_SEQUENCE = 3;
+    public static final short AMQP_VALUE_NULL = 4;
+    public static final short AMQP_VALUE_STRING = 5;
+    public static final short AMQP_VALUE_BINARY = 6;
+    public static final short AMQP_VALUE_MAP = 7;
+    public static final short AMQP_VALUE_LIST = 8;
+
+    static {
+        byte[] bytes;
+        try {
+            bytes = getSerializedBytes(null);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to initialise null object body", e);
+        }
+
+        NULL_OBJECT_BODY = new Data(new Binary(bytes));
+    }
+
+    /**
+     * Content type used to mark Data sections as containing a serialized java object.
+     */
+    public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object";
+
+    /**
+     * Content type used to mark Data sections as containing arbitrary bytes.
+     */
+    public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+
+    /**
+     * Lookup and return the correct Proton Symbol instance based on the given key.
+     *
+     * @param key
+     *        the String value name of the Symbol to locate.
+     *
+     * @return the Symbol value that matches the given key.
+     */
+    public static Symbol getSymbol(String key) {
+        return Symbol.valueOf(key);
+    }
+
+    /**
+     * Safe way to access message annotations which will check internal structure and
+     * either return the annotation if it exists or null if the annotation or any annotations
+     * are present.
+     *
+     * @param key
+     *        the String key to use to lookup an annotation.
+     * @param message
+     *        the AMQP message object that is being examined.
+     *
+     * @return the given annotation value or null if not present in the message.
+     */
+    public static Object getMessageAnnotation(String key, Message message) {
+        if (message != null && message.getMessageAnnotations() != null) {
+            Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+            return annotations.get(AmqpMessageSupport.getSymbol(key));
+        }
+
+        return null;
+    }
+
+    /**
+     * Check whether the content-type field of the properties section (if present) in
+     * the given message matches the provided string (where null matches if there is
+     * no content type present.
+     *
+     * @param contentType
+     *        content type string to compare against, or null if none
+     * @param message
+     *        the AMQP message object that is being examined.
+     *
+     * @return true if content type matches
+     */
+    public static boolean isContentType(String contentType, Message message) {
+        if (contentType == null) {
+            return message.getContentType() == null;
+        } else {
+            return contentType.equals(message.getContentType());
+        }
+    }
+
+    /**
+     * @param contentType the contentType of the received message
+     * @return the character set to use, or null if not to treat the message as text
+     */
+    public static Charset getCharsetForTextualContent(String contentType) {
+        try {
+            return AmqpContentTypeSupport.parseContentTypeForTextualCharset(contentType);
+        } catch (InvalidContentTypeException e) {
+            return null;
+        }
+    }
+
+    private static byte[] getSerializedBytes(Serializable value) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+
+            oos.writeObject(value);
+            oos.flush();
+            oos.close();
+
+            return baos.toByteArray();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
index f30d4c4..f0f71a8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -23,18 +23,21 @@ public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
 
     private final JMSMappingOutboundTransformer transformer;
 
-    public AutoOutboundTransformer(JMSVendor vendor) {
+    public AutoOutboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
+
         transformer = new JMSMappingOutboundTransformer(vendor);
     }
 
     @Override
     public EncodedMessage transform(Message msg) throws Exception {
-        if( msg == null )
+        if (msg == null) {
             return null;
-        if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
-            if( msg instanceof BytesMessage ) {
-                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
+        }
+
+        if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
+            if (msg instanceof BytesMessage) {
+                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage) msg);
             } else {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
index 733c0ec0..0b25cfa 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,7 +22,7 @@ import org.apache.qpid.proton.message.Message;
 public class EncodedMessage {
 
     private final Binary data;
-    final long messageFormat;
+    private final long messageFormat;
 
     public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
         this.data = new Binary(data, offset, length);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index e883bcf..e6b7a0f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
 import org.apache.qpid.proton.amqp.Decimal32;
@@ -41,31 +42,51 @@ import org.apache.qpid.proton.amqp.messaging.Properties;
 
 public abstract class InboundTransformer {
 
-    JMSVendor vendor;
+    protected final ActiveMQJMSVendor vendor;
 
     public static final String TRANSFORMER_NATIVE = "native";
     public static final String TRANSFORMER_RAW = "raw";
     public static final String TRANSFORMER_JMS = "jms";
 
-    String prefixVendor = "JMS_AMQP_";
-    String prefixDeliveryAnnotations = "DA_";
-    String prefixMessageAnnotations = "MA_";
-    String prefixFooter = "FT_";
+    protected String prefixVendor = "JMS_AMQP_";
+    protected String prefixDeliveryAnnotations = "DA_";
+    protected String prefixMessageAnnotations = "MA_";
+    protected String prefixFooter = "FT_";
 
-    int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
-    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
-    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+    protected int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
+    protected int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+    protected long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
 
-    public InboundTransformer(JMSVendor vendor) {
+    public InboundTransformer(ActiveMQJMSVendor vendor) {
         this.vendor = vendor;
     }
 
-    public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
-
     public abstract String getTransformerName();
 
     public abstract InboundTransformer getFallbackTransformer();
 
+    public final Message transform(EncodedMessage amqpMessage) throws Exception {
+        InboundTransformer transformer = this;
+        Message message = null;
+
+        while (transformer != null) {
+            try {
+                message = transformer.doTransform(amqpMessage);
+                break;
+            } catch (Exception e) {
+                transformer = transformer.getFallbackTransformer();
+            }
+        }
+
+        if (message == null) {
+            throw new AmqpProtocolException("Failed to transform incoming delivery, skipping.", false);
+        }
+
+        return message;
+    }
+
+    protected abstract Message doTransform(EncodedMessage amqpMessage) throws Exception;
+
     public int getDefaultDeliveryMode() {
         return defaultDeliveryMode;
     }
@@ -98,14 +119,10 @@ public abstract class InboundTransformer {
         this.prefixVendor = prefixVendor;
     }
 
-    public JMSVendor getVendor() {
+    public ActiveMQJMSVendor getVendor() {
         return vendor;
     }
 
-    public void setVendor(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
-
     @SuppressWarnings("unchecked")
     protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
         Header header = amqp.getHeader();

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java
new file mode 100644
index 0000000..7251b94
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InvalidContentTypeException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+public class InvalidContentTypeException extends Exception {
+
+    private static final long serialVersionUID = 1260362376856866687L;
+
+    public InvalidContentTypeException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
index 55a4db9..707e5da 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,27 +16,41 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import java.io.Serializable;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_MAP;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_NULL;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getCharsetForTextualContent;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.isContentType;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import javax.jms.BytesMessage;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
 
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
 
 public class JMSMappingInboundTransformer extends InboundTransformer {
 
-    public JMSMappingInboundTransformer(JMSVendor vendor) {
+    public JMSMappingInboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -50,70 +64,113 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
         return new AMQPNativeInboundTransformer(getVendor());
     }
 
-    @SuppressWarnings({ "unchecked" })
     @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
-        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+    protected javax.jms.Message doTransform(EncodedMessage amqpMessage) throws Exception {
+        Message amqp = amqpMessage.decode();
+
+        javax.jms.Message result = createMessage(amqp, amqpMessage);
+
+        result.setJMSDeliveryMode(defaultDeliveryMode);
+        result.setJMSPriority(defaultPriority);
+        result.setJMSExpiration(defaultTtl);
+
+        populateMessage(result, amqp);
+
+        result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+        result.setBooleanProperty(prefixVendor + "NATIVE", false);
+
+        return result;
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private javax.jms.Message createMessage(Message message, EncodedMessage original) throws Exception {
+
+        Section body = message.getBody();
+        javax.jms.Message result;
 
-        Message rc;
-        final Section body = amqp.getBody();
         if (body == null) {
-            rc = vendor.createMessage();
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+                result = vendor.createObjectMessage();
+            } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
+                result = vendor.createBytesMessage();
+            } else {
+                Charset charset = getCharsetForTextualContent(message.getContentType());
+                if (charset != null) {
+                    result = vendor.createTextMessage();
+                } else {
+                    result = vendor.createMessage();
+                }
+            }
+
+            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
         } else if (body instanceof Data) {
-            Binary d = ((Data) body).getValue();
-            BytesMessage m = vendor.createBytesMessage();
-            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-            rc = m;
+            Binary payload = ((Data) body).getValue();
+
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+                result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
+                result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            } else {
+                Charset charset = getCharsetForTextualContent(message.getContentType());
+                if (StandardCharsets.UTF_8.equals(charset)) {
+                    ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+
+                    try {
+                        CharBuffer chars = charset.newDecoder().decode(buf);
+                        result = vendor.createTextMessage(String.valueOf(chars));
+                    } catch (CharacterCodingException e) {
+                        result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                    }
+                } else {
+                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                }
+            }
+
+            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
         } else if (body instanceof AmqpSequence) {
             AmqpSequence sequence = (AmqpSequence) body;
             StreamMessage m = vendor.createStreamMessage();
             for (Object item : sequence.getValue()) {
                 m.writeObject(item);
             }
-            rc = m;
+
+            result = m;
+            result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
         } else if (body instanceof AmqpValue) {
             Object value = ((AmqpValue) body).getValue();
-            if (value == null) {
-                rc = vendor.createObjectMessage();
-            }
-            if (value instanceof String) {
-                TextMessage m = vendor.createTextMessage();
-                m.setText((String) value);
-                rc = m;
+            if (value == null || value instanceof String) {
+                result = vendor.createTextMessage((String) value);
+
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
             } else if (value instanceof Binary) {
-                Binary d = (Binary) value;
-                BytesMessage m = vendor.createBytesMessage();
-                m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-                rc = m;
+                Binary payload = (Binary) value;
+
+                if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
+                    result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                } else {
+                    result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                }
+
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
             } else if (value instanceof List) {
                 StreamMessage m = vendor.createStreamMessage();
                 for (Object item : (List<Object>) value) {
                     m.writeObject(item);
                 }
-                rc = m;
+                result = m;
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_LIST);
             } else if (value instanceof Map) {
-                MapMessage m = vendor.createMapMessage();
-                final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
-                for (Map.Entry<String, Object> entry : set) {
-                    m.setObject(entry.getKey(), entry.getValue());
-                }
-                rc = m;
+                result = vendor.createMapMessage((Map<String, Object>) value);
+                result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_MAP);
             } else {
-                ObjectMessage m = vendor.createObjectMessage();
-                m.setObject((Serializable) value);
-                rc = m;
+                // Trigger fall-back to native encoder which generates BytesMessage with the
+                // original message stored in the message body.
+                throw new AmqpProtocolException("Unable to encode to ActiveMQ JMS Message", false);
             }
         } else {
             throw new RuntimeException("Unexpected body type: " + body.getClass());
         }
-        rc.setJMSDeliveryMode(defaultDeliveryMode);
-        rc.setJMSPriority(defaultPriority);
-        rc.setJMSExpiration(defaultTtl);
-
-        populateMessage(rc, amqp);
 
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        rc.setBooleanProperty(prefixVendor + "NATIVE", false);
-        return rc;
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index c9a94fa..59c306f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -16,12 +16,24 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.Map;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
@@ -39,8 +51,6 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -81,7 +91,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
     public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
     public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
 
-    public JMSMappingOutboundTransformer(JMSVendor vendor) {
+    public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) {
         super(vendor);
     }
 
@@ -121,145 +131,107 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
     /**
      * Perform the conversion between JMS Message and Proton Message without
      * re-encoding it to array. This is needed because some frameworks may elect
-     * to do this on their own way (Netty for instance using Nettybuffers)
+     * to do this on their own way.
+     *
+     * @param message
+     *      The message to transform into an AMQP version for dispatch.
      *
-     * @param msg
-     * @return
-     * @throws Exception
+     * @return an AMQP Message that represents the given JMS Message.
+     *
+     * @throws Exception if an error occurs during the conversion.
      */
-    public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
+    public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException {
         Header header = new Header();
         Properties props = new Properties();
-        HashMap<Symbol, Object> daMap = null;
-        HashMap<Symbol, Object> maMap = null;
-        HashMap apMap = null;
+
+        Map<Symbol, Object> daMap = null;
+        Map<Symbol, Object> maMap = null;
+        Map<String,Object> apMap = null;
+        Map<Object, Object> footerMap = null;
         Section body = null;
-        HashMap footerMap = null;
-        if (msg instanceof BytesMessage) {
-            BytesMessage m = (BytesMessage) msg;
-            byte data[] = new byte[(int) m.getBodyLength()];
-            m.readBytes(data);
-            m.reset(); // Need to reset after readBytes or future readBytes
-                       // calls (ex: redeliveries) will fail and return -1
-            body = new Data(new Binary(data));
-        }
-        if (msg instanceof TextMessage) {
-            body = new AmqpValue(((TextMessage) msg).getText());
-        }
-        if (msg instanceof MapMessage) {
-            final HashMap<String, Object> map = new HashMap<String, Object>();
-            final MapMessage m = (MapMessage) msg;
-            final Enumeration<String> names = m.getMapNames();
-            while (names.hasMoreElements()) {
-                String key = names.nextElement();
-                map.put(key, m.getObject(key));
-            }
-            body = new AmqpValue(map);
-        }
-        if (msg instanceof StreamMessage) {
-            ArrayList<Object> list = new ArrayList<Object>();
-            final StreamMessage m = (StreamMessage) msg;
-            try {
-                while (true) {
-                    list.add(m.readObject());
-                }
-            } catch (MessageEOFException e) {
-            }
-            body = new AmqpSequence(list);
-        }
-        if (msg instanceof ObjectMessage) {
-            body = new AmqpValue(((ObjectMessage) msg).getObject());
-        }
 
-        header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
-        header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
-        if (msg.getJMSType() != null) {
-            props.setSubject(msg.getJMSType());
+        body = convertBody(message);
+
+        header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
+        header.setPriority(new UnsignedByte((byte) message.getJMSPriority()));
+        if (message.getJMSType() != null) {
+            props.setSubject(message.getJMSType());
         }
-        if (msg.getJMSMessageID() != null) {
-            ActiveMQMessage amqMsg = (ActiveMQMessage) msg;
-
-            MessageId msgId = amqMsg.getMessageId();
-            if (msgId.getTextView() != null) {
-                try {
-                    props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()));
-                } catch (AmqpProtocolException e) {
-                    props.setMessageId(msgId.getTextView().toString());
-                }
-            } else {
-                props.setMessageId(msgId.toString());
-            }
+        if (message.getJMSMessageID() != null) {
+            props.setMessageId(vendor.getOriginalMessageId(message));
         }
-        if (msg.getJMSDestination() != null) {
-            props.setTo(vendor.toAddress(msg.getJMSDestination()));
+        if (message.getJMSDestination() != null) {
+            props.setTo(vendor.toAddress(message.getJMSDestination()));
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
+            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination()));
 
             // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
+            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination()));
         }
-        if (msg.getJMSReplyTo() != null) {
-            props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
+        if (message.getJMSReplyTo() != null) {
+            props.setReplyTo(vendor.toAddress(message.getJMSReplyTo()));
             if (maMap == null) {
                 maMap = new HashMap<Symbol, Object>();
             }
-            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
+            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo()));
 
             // Deprecated: used by legacy QPid AMQP 1.0 JMS client
-            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
+            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo()));
         }
-        if (msg.getJMSCorrelationID() != null) {
-            String correlationId = msg.getJMSCorrelationID();
+        if (message.getJMSCorrelationID() != null) {
+            String correlationId = message.getJMSCorrelationID();
             try {
                 props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
             } catch (AmqpProtocolException e) {
                 props.setCorrelationId(correlationId);
             }
         }
-        if (msg.getJMSExpiration() != 0) {
-            long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
+        if (message.getJMSExpiration() != 0) {
+            long ttl = message.getJMSExpiration() - System.currentTimeMillis();
             if (ttl < 0) {
                 ttl = 1;
             }
             header.setTtl(new UnsignedInteger((int) ttl));
 
-            props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+            props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration()));
         }
-        if (msg.getJMSTimestamp() != 0) {
-            props.setCreationTime(new Date(msg.getJMSTimestamp()));
+        if (message.getJMSTimestamp() != 0) {
+            props.setCreationTime(new Date(message.getJMSTimestamp()));
         }
 
-        final Enumeration<String> keys = msg.getPropertyNames();
+        @SuppressWarnings("unchecked")
+        final Enumeration<String> keys = message.getPropertyNames();
+
         while (keys.hasMoreElements()) {
             String key = keys.nextElement();
-            if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
-                // skip..
+            if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) {
+                // skip transformer appended properties
             } else if (key.equals(firstAcquirerKey)) {
-                header.setFirstAcquirer(msg.getBooleanProperty(key));
+                header.setFirstAcquirer(message.getBooleanProperty(key));
             } else if (key.startsWith("JMSXDeliveryCount")) {
                 // The AMQP delivery-count field only includes prior failed delivery attempts,
                 // whereas JMSXDeliveryCount includes the first/current delivery attempt.
-                int amqpDeliveryCount = msg.getIntProperty(key) - 1;
+                int amqpDeliveryCount = message.getIntProperty(key) - 1;
                 if (amqpDeliveryCount > 0) {
                     header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
                 }
             } else if (key.startsWith("JMSXUserID")) {
-                String value = msg.getStringProperty(key);
+                String value = message.getStringProperty(key);
                 props.setUserId(new Binary(value.getBytes("UTF-8")));
             } else if (key.startsWith("JMSXGroupID")) {
-                String value = msg.getStringProperty(key);
+                String value = message.getStringProperty(key);
                 props.setGroupId(value);
                 if (apMap == null) {
-                    apMap = new HashMap();
+                    apMap = new HashMap<String, Object>();
                 }
                 apMap.put(key, value);
             } else if (key.startsWith("JMSXGroupSeq")) {
-                UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
+                UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
                 props.setGroupSequence(value);
                 if (apMap == null) {
-                    apMap = new HashMap();
+                    apMap = new HashMap<String, Object>();
                 }
                 apMap.put(key, value);
             } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
@@ -267,30 +239,30 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                     daMap = new HashMap<Symbol, Object>();
                 }
                 String name = key.substring(prefixDeliveryAnnotationsKey.length());
-                daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+                daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
             } else if (key.startsWith(prefixMessageAnnotationsKey)) {
                 if (maMap == null) {
                     maMap = new HashMap<Symbol, Object>();
                 }
                 String name = key.substring(prefixMessageAnnotationsKey.length());
-                maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+                maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
             } else if (key.equals(contentTypeKey)) {
-                props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
+                props.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
             } else if (key.equals(contentEncodingKey)) {
-                props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
+                props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
             } else if (key.equals(replyToGroupIDKey)) {
-                props.setReplyToGroupId(msg.getStringProperty(key));
+                props.setReplyToGroupId(message.getStringProperty(key));
             } else if (key.startsWith(prefixFooterKey)) {
                 if (footerMap == null) {
-                    footerMap = new HashMap();
+                    footerMap = new HashMap<Object, Object>();
                 }
                 String name = key.substring(prefixFooterKey.length());
-                footerMap.put(name, msg.getObjectProperty(key));
+                footerMap.put(name, message.getObjectProperty(key));
             } else {
                 if (apMap == null) {
-                    apMap = new HashMap();
+                    apMap = new HashMap<String, Object>();
                 }
-                apMap.put(key, msg.getObjectProperty(key));
+                apMap.put(key, message.getObjectProperty(key));
             }
         }
 
@@ -314,6 +286,101 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
         return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
     }
 
+    private Section convertBody(Message message) throws JMSException {
+
+        Section body = null;
+        short orignalEncoding = AMQP_UNKNOWN;
+
+        if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) {
+            try {
+                orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY);
+            } catch (Exception ex) {
+            }
+        }
+
+        if (message instanceof BytesMessage) {
+            Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message);
+
+            if (payload == null) {
+                payload = EMPTY_BINARY;
+            }
+
+            switch (orignalEncoding) {
+                case AMQP_NULL:
+                    break;
+                case AMQP_VALUE_BINARY:
+                    body = new AmqpValue(payload);
+                    break;
+                case AMQP_DATA:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new Data(payload);
+                    break;
+            }
+        } else if (message instanceof TextMessage) {
+            switch (orignalEncoding) {
+                case AMQP_NULL:
+                    break;
+                case AMQP_DATA:
+                    body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message));
+                    break;
+                case AMQP_VALUE_STRING:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new AmqpValue(((TextMessage) message).getText());
+                    break;
+            }
+        } else if (message instanceof MapMessage) {
+            body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message));
+        } else if (message instanceof StreamMessage) {
+            ArrayList<Object> list = new ArrayList<Object>();
+            final StreamMessage m = (StreamMessage) message;
+            try {
+                while (true) {
+                    list.add(m.readObject());
+                }
+            } catch (MessageEOFException e) {
+            }
+
+            switch (orignalEncoding) {
+                case AMQP_SEQUENCE:
+                    body = new AmqpSequence(list);
+                    break;
+                case AMQP_VALUE_LIST:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new AmqpValue(list);
+                    break;
+            }
+        } else if (message instanceof ObjectMessage) {
+            Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message);
+
+            if (payload == null) {
+                payload = EMPTY_BINARY;
+            }
+
+            switch (orignalEncoding) {
+                case AMQP_VALUE_BINARY:
+                    body = new AmqpValue(payload);
+                    break;
+                case AMQP_DATA:
+                case AMQP_UNKNOWN:
+                default:
+                    body = new Data(payload);
+                    break;
+            }
+
+            // For a non-AMQP message we tag the outbound content type as containing
+            // a serialized Java object so that an AMQP client has a hint as to what
+            // we are sending it.
+            if (!message.propertyExists(contentTypeKey)) {
+                vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+            }
+        }
+
+        return body;
+    }
+
     private static byte destinationType(Destination destination) {
         if (destination instanceof Queue) {
             if (destination instanceof TemporaryQueue) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
deleted file mode 100644
index f9169ec..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.message;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-public interface JMSVendor {
-
-    public abstract BytesMessage createBytesMessage();
-
-    public abstract StreamMessage createStreamMessage();
-
-    public abstract Message createMessage();
-
-    public abstract TextMessage createTextMessage();
-
-    public abstract ObjectMessage createObjectMessage();
-
-    public abstract MapMessage createMapMessage();
-
-    public abstract void setJMSXUserID(Message message, String value);
-
-    public Destination createDestination(String name);
-
-    public abstract void setJMSXGroupID(Message message, String groupId);
-
-    public abstract void setJMSXGroupSequence(Message message, int value);
-
-    public abstract void setJMSXDeliveryCount(Message message, long value);
-
-    public abstract String toAddress(Destination destination);
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
index 1d28a07..2eefa50 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,24 +20,25 @@ import javax.jms.Message;
 
 public abstract class OutboundTransformer {
 
-    JMSVendor vendor;
-    String prefixVendor;
+    protected final ActiveMQJMSVendor vendor;
 
-    String prefixDeliveryAnnotations = "DA_";
-    String prefixMessageAnnotations= "MA_";
-    String prefixFooter = "FT_";
+    protected String prefixVendor;
 
-    String messageFormatKey;
-    String nativeKey;
-    String firstAcquirerKey;
-    String prefixDeliveryAnnotationsKey;
-    String prefixMessageAnnotationsKey;
-    String contentTypeKey;
-    String contentEncodingKey;
-    String replyToGroupIDKey;
-    String prefixFooterKey;
+    protected String prefixDeliveryAnnotations = "DA_";
+    protected String prefixMessageAnnotations= "MA_";
+    protected String prefixFooter = "FT_";
 
-    public OutboundTransformer(JMSVendor vendor) {
+    protected String messageFormatKey;
+    protected String nativeKey;
+    protected String firstAcquirerKey;
+    protected String prefixDeliveryAnnotationsKey;
+    protected String prefixMessageAnnotationsKey;
+    protected String contentTypeKey;
+    protected String contentEncodingKey;
+    protected String replyToGroupIDKey;
+    protected String prefixFooterKey;
+
+    public OutboundTransformer(ActiveMQJMSVendor vendor) {
         this.vendor = vendor;
         this.setPrefixVendor("JMS_AMQP_");
     }
@@ -56,18 +57,13 @@ public abstract class OutboundTransformer {
         firstAcquirerKey = prefixVendor + "FirstAcquirer";
         prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
         prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
-        contentTypeKey = prefixVendor +"ContentType";
-        contentEncodingKey = prefixVendor +"ContentEncoding";
-        replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
+        contentTypeKey = prefixVendor + "ContentType";
+        contentEncodingKey = prefixVendor + "ContentEncoding";
+        replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
         prefixFooterKey = prefixVendor + prefixFooter;
-
     }
 
-    public JMSVendor getVendor() {
+    public ActiveMQJMSVendor getVendor() {
         return vendor;
     }
-
-    public void setVendor(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 3ae018e..503a05e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -157,23 +157,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
 
             InboundTransformer transformer = getTransformer();
-            ActiveMQMessage message = null;
-
-            while (transformer != null) {
-                try {
-                    message = (ActiveMQMessage) transformer.transform(em);
-                    break;
-                } catch (Exception e) {
-                    LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
-                    LOG.trace("Transformation error:", e);
-
-                    transformer = transformer.getFallbackTransformer();
-                }
-            }
-
-            if (message == null) {
-                throw new IOException("Failed to transform incoming delivery, skipping.");
-            }
+            ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em);
 
             current = null;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d54e21b2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 3ce5ef6..84d5864 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -29,6 +29,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -39,6 +40,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -166,7 +168,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
     @Test(timeout = 60000)
     public void testMessagePropertiesArePreservedAMQPToOpenWire() throws Exception {
 
-        // Raw Transformer doesn't expand message propeties.
+        // Raw Transformer doesn't expand message properties.
         assumeFalse(transformer.equals("raw"));
 
         boolean bool = true;
@@ -284,7 +286,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         // Now consumer the ObjectMessage
         Message received = amqpConsumer.receive(2000);
         assertNotNull(received);
-        assertTrue(received instanceof ObjectMessage);
+        assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage);
         ObjectMessage incoming = (ObjectMessage) received;
 
         Object incomingObject = incoming.getObject();
@@ -297,7 +299,126 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         openwire.close();
     }
 
-    //----- Tests for OpenWire to Qpid JMS using ObjectMessage ---------------//
+    //----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
+
+    @Test
+    public void testQpidToOpenWireObjectMessage() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        Connection openwire = createJMSConnection();
+        Connection amqp = createConnection();
+
+        openwire.start();
+        amqp.start();
+
+        Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = openwireSession.createQueue(getDestinationName());
+
+        MessageProducer amqpProducer = amqpSession.createProducer(queue);
+        MessageConsumer openwireConsumer = openwireSession.createConsumer(queue);
+
+        // Create and send the Message
+        ObjectMessage outgoing = amqpSession.createObjectMessage();
+        outgoing.setObject(UUID.randomUUID());
+        amqpProducer.send(outgoing);
+
+        // Now consumer the ObjectMessage
+        Message received = openwireConsumer.receive(2000);
+        assertNotNull(received);
+        LOG.info("Read new message: {}", received);
+        assertTrue(received instanceof ObjectMessage);
+        ObjectMessage incoming = (ObjectMessage) received;
+        Object payload = incoming.getObject();
+        assertNotNull(payload);
+        assertTrue(payload instanceof UUID);
+
+        amqp.close();
+        openwire.close();
+    }
+
+    @Test
+    public void testOpenWireToQpidObjectMessage() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        Connection openwire = createJMSConnection();
+        Connection amqp = createConnection();
+
+        openwire.start();
+        amqp.start();
+
+        Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = openwireSession.createQueue(getDestinationName());
+
+        MessageProducer openwireProducer = openwireSession.createProducer(queue);
+        MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+        // Create and send the Message
+        ObjectMessage outgoing = amqpSession.createObjectMessage();
+        outgoing.setObject(UUID.randomUUID());
+        openwireProducer.send(outgoing);
+
+        // Now consumer the ObjectMessage
+        Message received = amqpConsumer.receive(2000);
+        assertNotNull(received);
+        LOG.info("Read new message: {}", received);
+        assertTrue(received instanceof ObjectMessage);
+        ObjectMessage incoming = (ObjectMessage) received;
+        Object payload = incoming.getObject();
+        assertNotNull(payload);
+        assertTrue(payload instanceof UUID);
+
+        amqp.close();
+        openwire.close();
+    }
+
+    @Test
+    public void testOpenWireToQpidObjectMessageWithOpenWireCompression() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        Connection openwire = createJMSConnection();
+        ((ActiveMQConnection) openwire).setUseCompression(true);
+
+        Connection amqp = createConnection();
+
+        openwire.start();
+        amqp.start();
+
+        Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = openwireSession.createQueue(getDestinationName());
+
+        MessageProducer openwireProducer = openwireSession.createProducer(queue);
+        MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+        // Create and send the Message
+        ObjectMessage outgoing = amqpSession.createObjectMessage();
+        outgoing.setObject(UUID.randomUUID());
+        openwireProducer.send(outgoing);
+
+        // Now consumer the ObjectMessage
+        Message received = amqpConsumer.receive(2000);
+        assertNotNull(received);
+        LOG.info("Read new message: {}", received);
+        assertTrue(received instanceof ObjectMessage);
+        ObjectMessage incoming = (ObjectMessage) received;
+        Object payload = incoming.getObject();
+        assertNotNull(payload);
+        assertTrue(payload instanceof UUID);
+
+        amqp.close();
+        openwire.close();
+    }
 
     @SuppressWarnings("unchecked")
     @Test


Mime
View raw message