activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [4/5] activemq-artemis git commit: ARTEMIS-770 AMQP Message Transformer refactor
Date Fri, 07 Oct 2016 14:51:05 GMT
ARTEMIS-770 AMQP Message Transformer refactor

Refactor the AMQP Message transformers both for better performance and
also to fix a number of issues with the transformers creating inbound
and outbound messages with incorrectly mapped values or extra data
appended where it should not be.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/62627bf2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/62627bf2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/62627bf2

Branch: refs/heads/master
Commit: 62627bf2ee4789d8c019170fba0fcbf64a697a38
Parents: 6f6d984
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Oct 3 14:23:50 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Oct 7 10:42:52 2016 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  12 +-
 .../amqp/converter/ActiveMQJMSVendor.java       | 148 ---
 .../amqp/converter/ProtonMessageConverter.java  |  83 +-
 .../amqp/converter/jms/ServerJMSMessage.java    |   9 +-
 .../converter/jms/ServerJMSObjectMessage.java   |  47 +-
 .../message/AMQPContentTypeSupport.java         | 146 +++
 .../converter/message/AMQPMessageIdHelper.java  |  59 +-
 .../converter/message/AMQPMessageSupport.java   | 272 ++++++
 .../converter/message/AMQPMessageTypes.java     |   4 +
 .../message/AMQPNativeInboundTransformer.java   |  28 +-
 .../message/AMQPNativeOutboundTransformer.java  |  82 +-
 .../message/AMQPRawInboundTransformer.java      |  48 +-
 .../amqp/converter/message/EncodedMessage.java  |  12 +-
 .../converter/message/InboundTransformer.java   | 182 ++--
 .../message/JMSMappingInboundTransformer.java   | 198 ++--
 .../message/JMSMappingOutboundTransformer.java  | 650 +++++++++----
 .../amqp/converter/message/JMSVendor.java       |  53 --
 .../converter/message/OutboundTransformer.java  |  78 +-
 ...ActiveMQAMQPInvalidContentTypeException.java |  27 +
 .../amqp/proton/ProtonServerSenderContext.java  |  64 +-
 .../amqp/converter/TestConversions.java         | 130 ++-
 .../message/AMQPContentTypeSupportTest.java     | 230 +++++
 .../message/AMQPMessageIdHelperTest.java        | 391 ++++++++
 .../message/AMQPMessageSupportTest.java         | 108 +++
 .../JMSMappingInboundTransformerTest.java       | 718 ++++++++++++++
 .../JMSMappingOutboundTransformerTest.java      | 952 +++++++++++++++++++
 .../JMSTransformationSpeedComparisonTest.java   | 300 ++++++
 .../message/MessageTransformationTest.java      | 264 +++++
 .../transport/amqp/client/AmqpMessage.java      |  24 +
 .../tests/integration/amqp/ProtonTest.java      |  71 +-
 30 files changed, 4458 insertions(+), 932 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index c7ca446..66c7b4b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -21,7 +21,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -37,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
@@ -59,9 +59,10 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.ProtonJMessage;
+import io.netty.buffer.ByteBuf;
 import org.jboss.logging.Logger;
 
 public class AMQPSessionCallback implements SessionCallback {
@@ -259,8 +260,11 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception {
-      return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
+   public long encodeMessage(Object message, int deliveryCount, WritableBuffer buffer) throws Exception {
+      ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter();
+
+      // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode.
+      return (long) converter.outbound((ServerMessage) message, deliveryCount, buffer);
    }
 
    public String tempQueueName() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
deleted file mode 100644
index 0b28660..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSVendor;
-import org.apache.activemq.artemis.utils.IDGenerator;
-
-public class ActiveMQJMSVendor implements JMSVendor {
-
-   private final IDGenerator serverGenerator;
-
-   ActiveMQJMSVendor(IDGenerator idGenerator) {
-      this.serverGenerator = idGenerator;
-   }
-
-   @Override
-   public BytesMessage createBytesMessage() {
-      return new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0);
-   }
-
-   @Override
-   public StreamMessage createStreamMessage() {
-      return new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0);
-   }
-
-   @Override
-   public Message createMessage() {
-      return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0);
-   }
-
-   @Override
-   public TextMessage createTextMessage() {
-      return new ServerJMSTextMessage(newMessage(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE), 0);
-   }
-
-   @Override
-   public ObjectMessage createObjectMessage() {
-      return new ServerJMSObjectMessage(newMessage(org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE), 0);
-   }
-
-   @Override
-   public MapMessage createMapMessage() {
-      return new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0);
-   }
-
-   @Override
-   public void setJMSXUserID(Message message, String s) {
-   }
-
-   @Override
-   public Destination createDestination(String name) {
-      return new ServerDestination(name);
-   }
-
-   @Override
-   public void setJMSXGroupID(Message message, String s) {
-      try {
-         message.setStringProperty("_AMQ_GROUP_ID", s);
-      } catch (JMSException e) {
-         throw new RuntimeException(e);
-      }
-   }
-
-   @Override
-   public void setJMSXGroupSequence(Message message, int i) {
-      try {
-         message.setIntProperty("JMSXGroupSeq", i);
-      } catch (JMSException e) {
-         throw new RuntimeException(e);
-      }
-   }
-
-   @Override
-   public void setJMSXDeliveryCount(Message message, long l) {
-      try {
-         message.setLongProperty("JMSXDeliveryCount", l);
-      } catch (JMSException e) {
-         throw new RuntimeException(e);
-      }
-   }
-
-   public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) {
-      switch (messageType) {
-         case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
-            return new ServerJMSStreamMessage(wrapped, deliveryCount);
-         case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
-            return new ServerJMSBytesMessage(wrapped, deliveryCount);
-         case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
-            return new ServerJMSMapMessage(wrapped, deliveryCount);
-         case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
-            return new ServerJMSTextMessage(wrapped, deliveryCount);
-         case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
-            return new ServerJMSObjectMessage(wrapped, deliveryCount);
-         default:
-            return new ServerJMSMessage(wrapped, deliveryCount);
-      }
-   }
-
-   @Override
-   public String toAddress(Destination destination) {
-      if (destination instanceof ActiveMQDestination) {
-         return ((ActiveMQDestination) destination).getAddress();
-      }
-      return null;
-   }
-
-   private ServerMessageImpl newMessage(byte messageType) {
-      ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512);
-      message.setType(messageType);
-      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
-      return message;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
index 6eb78d0..6aa44a4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
@@ -16,91 +16,86 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
-import javax.jms.BytesMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
+
 import java.io.IOException;
 
+import javax.jms.BytesMessage;
+
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer;
 import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.qpid.proton.codec.WritableBuffer;
 
-public class ProtonMessageConverter implements MessageConverter {
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
-   ActiveMQJMSVendor activeMQJMSVendor;
-
-   private final String prefixVendor;
+public class ProtonMessageConverter implements MessageConverter {
 
    public ProtonMessageConverter(IDGenerator idGenerator) {
-      activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator);
-      inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor);
-      outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor);
-      prefixVendor = outboundTransformer.getPrefixVendor();
+      inboundTransformer = new JMSMappingInboundTransformer(idGenerator);
+      outboundTransformer = new JMSMappingOutboundTransformer(idGenerator);
    }
 
    private final InboundTransformer inboundTransformer;
-   private final JMSMappingOutboundTransformer outboundTransformer;
+   private final OutboundTransformer outboundTransformer;
 
    @Override
    public ServerMessage inbound(Object messageSource) throws Exception {
-      ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource);
-
-      return (ServerMessage) jmsMessage.getInnerMessage();
-   }
-
-   /**
-    * Just create the JMS Part of the inbound (for testing)
-    *
-    * @param messageSource
-    * @return
-    * @throws Exception https://issues.jboss.org/browse/ENTMQ-1560
-    */
-   public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception {
-      EncodedMessage encodedMessageSource = messageSource;
+      EncodedMessage encodedMessageSource = (EncodedMessage) messageSource;
       ServerJMSMessage transformedMessage = null;
 
-      InboundTransformer transformer = inboundTransformer;
-
-      while (transformer != null) {
-         try {
-            transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource);
-            break;
-         } catch (Exception e) {
-            ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
-            ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
-
-            transformer = transformer.getFallbackTransformer();
-         }
-      }
+      try {
+         transformedMessage = inboundTransformer.transform(encodedMessageSource);
+      } catch (Exception e) {
+         ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
+         ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
 
-      if (transformedMessage == null) {
          throw new IOException("Failed to transform incoming delivery, skipping.");
       }
 
       transformedMessage.encode();
 
-      return transformedMessage;
+      return (ServerMessage) transformedMessage.getInnerMessage();
    }
 
    @Override
    public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
-      ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
+      // Useful for testing but not recommended for real life use.
+      ByteBuf nettyBuffer = Unpooled.buffer(1024);
+      NettyWritable buffer = new NettyWritable(nettyBuffer);
+      long messageFormat = (long) outbound(messageOutbound, deliveryCount, buffer);
+
+      EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(),
+         nettyBuffer.readableBytes());
+
+      return encoded;
+   }
+
+   public Object outbound(ServerMessage messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception {
+      ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
 
       jmsMessage.decode();
 
-      if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) {
+      if (jmsMessage.getBooleanProperty(JMS_AMQP_NATIVE)) {
          if (jmsMessage instanceof BytesMessage) {
-            return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage);
+            return AMQPNativeOutboundTransformer.transform(outboundTransformer, (ServerJMSBytesMessage) jmsMessage, buffer);
          } else {
-            return null;
+            return 0;
          }
       } else {
-         return outboundTransformer.convert(jmsMessage);
+         return outboundTransformer.transform(jmsMessage, buffer);
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index a6eac1d..c7900e4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -16,12 +16,13 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.jms;
 
+import java.util.Collections;
+import java.util.Enumeration;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import java.util.Collections;
-import java.util.Enumeration;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -47,6 +48,10 @@ public class ServerJMSMessage implements Message {
       this.deliveryCount = deliveryCount;
    }
 
+   public int getDeliveryCount() {
+      return deliveryCount;
+   }
+
    private ActiveMQBuffer readBodyBuffer;
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
index 7f0906e..d1eaac6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
@@ -16,31 +16,20 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.jms;
 
+import java.io.Serializable;
+
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
+import org.apache.qpid.proton.amqp.Binary;
 
 public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage {
 
-   private static final String DEFAULT_WHITELIST;
-   private static final String DEFAULT_BLACKLIST;
-
-   static {
-      DEFAULT_WHITELIST = System.getProperty(ObjectInputStreamWithClassLoader.WHITELIST_PROPERTY, "java.lang,java.math,javax.security,java.util,org.apache.activemq,org.apache.qpid.proton.amqp");
-
-      DEFAULT_BLACKLIST = System.getProperty(ObjectInputStreamWithClassLoader.BLACKLIST_PROPERTY, null);
-   }
+   public static final byte TYPE = Message.OBJECT_TYPE;
 
-   public static final byte TYPE = Message.STREAM_TYPE;
-
-   private Serializable object;
+   private Binary payload;
 
    public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) {
       super(message, deliveryCount);
@@ -48,23 +37,27 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe
 
    @Override
    public void setObject(Serializable object) throws JMSException {
-      this.object = object;
+      throw new UnsupportedOperationException("Cannot set Object on this internal message");
    }
 
    @Override
    public Serializable getObject() throws JMSException {
-      return object;
+      throw new UnsupportedOperationException("Cannot set Object on this internal message");
+   }
+
+   public void setSerializedForm(Binary payload) {
+      this.payload = payload;
+   }
+
+   public Binary getSerializedForm() {
+      return payload;
    }
 
    @Override
    public void encode() throws Exception {
       super.encode();
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      ObjectOutputStream ous = new ObjectOutputStream(out);
-      ous.writeObject(object);
-      byte[] src = out.toByteArray();
-      getInnerMessage().getBodyBuffer().writeInt(src.length);
-      getInnerMessage().getBodyBuffer().writeBytes(src);
+      getInnerMessage().getBodyBuffer().writeInt(payload.getLength());
+      getInnerMessage().getBodyBuffer().writeBytes(payload.getArray(), payload.getArrayOffset(), payload.getLength());
    }
 
    @Override
@@ -73,10 +66,6 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe
       int size = getInnerMessage().getBodyBuffer().readInt();
       byte[] bytes = new byte[size];
       getInnerMessage().getBodyBuffer().readBytes(bytes);
-      try (ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes))) {
-         ois.setWhiteList(DEFAULT_WHITELIST);
-         ois.setBlackList(DEFAULT_BLACKLIST);
-         object = (Serializable) ois.readObject();
-      }
+      payload = new Binary(bytes);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java
new file mode 100644
index 0000000..01d72c8
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.StandardCharsets;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.StringTokenizer;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
+
+public final class AMQPContentTypeSupport {
+
+   private static final String UTF_8 = "UTF-8";
+   private static final String CHARSET = "charset";
+   private static final String TEXT = "text";
+   private static final String APPLICATION = "application";
+   private static final String JAVASCRIPT = "javascript";
+   private static final String XML = "xml";
+   private static final String XML_VARIANT = "+xml";
+   private static final String JSON = "json";
+   private static final String JSON_VARIANT = "+json";
+   private static final String XML_DTD = "xml-dtd";
+   private static final String ECMASCRIPT = "ecmascript";
+
+   /**
+    * @param contentType
+    *        the contentType of the received message
+    * @return the character set to use, or null if not to treat the message as text
+    * @throws ActiveMQAMQPInvalidContentTypeException
+    *         if the content-type is invalid in some way.
+    */
+   public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException {
+      if (contentType == null || contentType.trim().isEmpty()) {
+         throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty");
+      }
+
+      int subTypeSeparator = contentType.indexOf("/");
+      if (subTypeSeparator == -1) {
+         throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType);
+      }
+
+      final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
+
+      String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+
+      String parameterPart = null;
+      int parameterSeparator = subTypePart.indexOf(";");
+      if (parameterSeparator != -1) {
+         if (parameterSeparator < subTypePart.length() - 1) {
+            parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+         }
+         subTypePart = subTypePart.substring(0, parameterSeparator).trim();
+      }
+
+      if (subTypePart.isEmpty()) {
+         throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType);
+      }
+
+      final String subType = subTypePart;
+
+      if (isTextual(type, subType)) {
+         String charset = findCharset(parameterPart);
+         if (charset == null) {
+            charset = UTF_8;
+         }
+
+         if (UTF_8.equals(charset)) {
+            return StandardCharsets.UTF_8;
+         } else {
+            try {
+               return Charset.forName(charset);
+            } catch (IllegalCharsetNameException icne) {
+               throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset);
+            } catch (UnsupportedCharsetException uce) {
+               throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset);
+            }
+         }
+      }
+
+      return null;
+   }
+
+   // ----- Internal Content Type utilities ----------------------------------//
+
+   private static boolean isTextual(String type, String subType) {
+      if (TEXT.equals(type)) {
+         return true;
+      }
+
+      if (APPLICATION.equals(type)) {
+         if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT)
+            || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) {
+            return true;
+         }
+      }
+
+      return false;
+   }
+
+   private static String findCharset(String paramaterPart) {
+      String charset = null;
+
+      if (paramaterPart != null) {
+         StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";");
+         while (tokenizer.hasMoreTokens()) {
+            String parameter = tokenizer.nextToken().trim();
+            int eqIndex = parameter.indexOf('=');
+            if (eqIndex != -1) {
+               String name = parameter.substring(0, eqIndex);
+               if (CHARSET.equalsIgnoreCase(name.trim())) {
+                  String value = unquote(parameter.substring(eqIndex + 1));
+
+                  charset = value.toUpperCase();
+                  break;
+               }
+            }
+         }
+      }
+
+      return charset;
+   }
+
+   private static String unquote(String s) {
+      if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) {
+         return s.substring(1, s.length() - 1);
+      } else {
+         return s;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
index dc7891c..e9a9969 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
@@ -28,24 +28,29 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 
 /**
- * Helper class for identifying and converting message-id and correlation-id values between
- * the AMQP types and the Strings values used by JMS.
+ * Helper class for identifying and converting message-id and correlation-id values between the
+ * AMQP types and the Strings values used by JMS.
  * <p>
- * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
- * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
- * for interoperability with other AMQP clients, the following encoding can be used after removing or
- * before adding the "ID:" prefix used for a JMSMessageID value:<br>
+ * <p>
+ * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string,
+ * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a
+ * string representation of these for interoperability with other AMQP clients, the following
+ * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID
+ * value:<br>
  * <p>
  * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
  * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
  * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
  * {@literal "AMQP_STRING:<string>"}<br>
  * <p>
- * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
- * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
  * <p>
- * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
- * ulong but can't be converted into the indicated format, an exception will be thrown.
+ * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to
+ * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used
+ * otherwise.
+ * <p>
+ * <p>
+ * When provided a string for conversion which attempts to identify itself as an encoded binary,
+ * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown.
  */
 public class AMQPMessageIdHelper {
 
@@ -63,11 +68,12 @@ public class AMQPMessageIdHelper {
    private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
 
    /**
-    * Takes the provided AMQP messageId style object, and convert it to a base string.
-    * Encodes type information as a prefix where necessary to convey or escape the type
-    * of the provided object.
+    * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes
+    * type information as a prefix where necessary to convey or escape the type of the provided
+    * object.
     *
-    * @param messageId the raw messageId object to process
+    * @param messageId
+    *        the raw messageId object to process
     * @return the base string to be used in creating the actual id.
     */
    public String toBaseMessageIdString(Object messageId) {
@@ -106,9 +112,12 @@ public class AMQPMessageIdHelper {
     * Takes the provided base id string and return the appropriate amqp messageId style object.
     * Converts the type based on any relevant encoding information found as a prefix.
     *
-    * @param baseId the object to be converted to an AMQP MessageId value.
+    * @param baseId
+    *        the object to be converted to an AMQP MessageId value.
     * @return the AMQP messageId style object
-    * @throws ActiveMQAMQPIllegalStateException if the provided baseId String indicates an encoded type but can't be converted to that type.
+    * @throws ActiveMQAMQPIllegalStateException
+    *         if the provided baseId String indicates an encoded type but can't be converted to
+    *         that type.
     */
    public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
       if (baseId == null) {
@@ -143,15 +152,17 @@ public class AMQPMessageIdHelper {
     * <p>
     * The hex characters may be upper or lower case.
     *
-    * @param hexString string to convert to a binary value.
+    * @param hexString
+    *        string to convert to a binary value.
     * @return a byte array containing the binary representation
-    * @throws IllegalArgumentException if the provided String is a non-even length or contains
-    *                                  non-hex characters
+    * @throws IllegalArgumentException
+    *         if the provided String is a non-even length or contains non-hex characters
     */
    public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
       int length = hexString.length();
 
-      // As each byte needs two characters in the hex encoding, the string must be an even length.
+      // As each byte needs two characters in the hex encoding, the string must be an even
+      // length.
       if (length % 2 != 0) {
          throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
       }
@@ -177,7 +188,8 @@ public class AMQPMessageIdHelper {
     * <p>
     * The returned hex characters are upper-case.
     *
-    * @param bytes the binary value to convert to a hex String instance.
+    * @param bytes
+    *        the binary value to convert to a hex String instance.
     * @return a String containing a hex representation of the bytes
     */
    public String convertBinaryToHexString(byte[] bytes) {
@@ -198,11 +210,10 @@ public class AMQPMessageIdHelper {
       return builder.toString();
    }
 
-   //----- Internal implementation ------------------------------------------//
+   // ----- Internal implementation ------------------------------------------//
 
    private boolean hasTypeEncodingPrefix(String stringId) {
-      return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) ||
-         hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
+      return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
    }
 
    private boolean hasAmqpStringPrefix(String stringId) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
new file mode 100644
index 0000000..9eab737
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Support class containing constant values and static methods that are used to map to / from
+ * AMQP Message types being sent or received.
+ */
+public final class AMQPMessageSupport {
+
+   // Message Properties used to map AMQP to JMS and back
+
+   public static final String JMS_AMQP_PREFIX = "JMS_AMQP_";
+   public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length();
+
+   public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+   public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING";
+   public static final String NATIVE = "NATIVE";
+   public static final String HEADER = "HEADER";
+   public static final String PROPERTIES = "PROPERTIES";
+
+   public static final String FIRST_ACQUIRER = "FirstAcquirer";
+   public static final String CONTENT_TYPE = "ContentType";
+   public static final String CONTENT_ENCODING = "ContentEncoding";
+   public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
+
+   public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
+   public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
+   public static final String FOOTER_PREFIX = "FT_";
+
+   public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
+   public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
+   public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
+   public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;
+   public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE;
+   public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER;
+   public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE;
+   public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING;
+   public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID;
+   public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
+   public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
+   public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
+
+   // Message body type definitions
+   public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
+   public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
+
+   public static final short AMQP_UNKNOWN = 0;
+   public static final short AMQP_NULL = 1;
+   public static final short AMQP_DATA = 2;
+   public static final short AMQP_SEQUENCE = 3;
+   public static final short AMQP_VALUE_NULL = 4;
+   public static final short AMQP_VALUE_STRING = 5;
+   public static final short AMQP_VALUE_BINARY = 6;
+   public static final short AMQP_VALUE_MAP = 7;
+   public static final short AMQP_VALUE_LIST = 8;
+
+   /**
+    * Content type used to mark Data sections as containing a serialized java object.
+    */
+   public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object";
+
+   /**
+    * Content type used to mark Data sections as containing arbitrary bytes.
+    */
+   public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+
+   /**
+    * Lookup and return the correct Proton Symbol instance based on the given key.
+    *
+    * @param key
+    *        the String value name of the Symbol to locate.
+    *
+    * @return the Symbol value that matches the given key.
+    */
+   public static Symbol getSymbol(String key) {
+      return Symbol.valueOf(key);
+   }
+
+   /**
+    * Safe way to access message annotations which will check internal structure and either
+    * return the annotation if it exists or null if the annotation or any annotations are
+    * present.
+    *
+    * @param key
+    *        the String key to use to lookup an annotation.
+    * @param message
+    *        the AMQP message object that is being examined.
+    *
+    * @return the given annotation value or null if not present in the message.
+    */
+   public static Object getMessageAnnotation(String key, Message message) {
+      if (message != null && message.getMessageAnnotations() != null) {
+         Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+         return annotations.get(AMQPMessageSupport.getSymbol(key));
+      }
+
+      return null;
+   }
+
+   /**
+    * Check whether the content-type field of the properties section (if present) in the given
+    * message matches the provided string (where null matches if there is no content type
+    * present.
+    *
+    * @param contentType
+    *        content type string to compare against, or null if none
+    * @param message
+    *        the AMQP message object that is being examined.
+    *
+    * @return true if content type matches
+    */
+   public static boolean isContentType(String contentType, Message message) {
+      if (contentType == null) {
+         return message.getContentType() == null;
+      } else {
+         return contentType.equals(message.getContentType());
+      }
+   }
+
+   /**
+    * @param contentType
+    *        the contentType of the received message
+    * @return the character set to use, or null if not to treat the message as text
+    */
+   public static Charset getCharsetForTextualContent(String contentType) {
+      try {
+         return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType);
+      } catch (ActiveMQAMQPInvalidContentTypeException e) {
+         return null;
+      }
+   }
+
+   public static ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) {
+      switch (messageType) {
+         case STREAM_TYPE:
+            return new ServerJMSStreamMessage(wrapped, deliveryCount);
+         case BYTES_TYPE:
+            return new ServerJMSBytesMessage(wrapped, deliveryCount);
+         case MAP_TYPE:
+            return new ServerJMSMapMessage(wrapped, deliveryCount);
+         case TEXT_TYPE:
+            return new ServerJMSTextMessage(wrapped, deliveryCount);
+         case OBJECT_TYPE:
+            return new ServerJMSObjectMessage(wrapped, deliveryCount);
+         default:
+            return new ServerJMSMessage(wrapped, deliveryCount);
+      }
+   }
+
+   public static String toAddress(Destination destination) {
+      if (destination instanceof ActiveMQDestination) {
+         return ((ActiveMQDestination) destination).getAddress();
+      }
+      return null;
+   }
+
+   public static ServerJMSBytesMessage createBytesMessage(IDGenerator idGenerator) {
+      return new ServerJMSBytesMessage(newMessage(idGenerator, BYTES_TYPE), 0);
+   }
+
+   public static ServerJMSMessage createBytesMessage(IDGenerator idGenerator, byte[] array, int arrayOffset, int length) throws JMSException {
+      ServerJMSBytesMessage message = createBytesMessage(idGenerator);
+      message.writeBytes(array, arrayOffset, length);
+      return message;
+   }
+
+   public static ServerJMSStreamMessage createStreamMessage(IDGenerator idGenerator) {
+      return new ServerJMSStreamMessage(newMessage(idGenerator, STREAM_TYPE), 0);
+   }
+
+   public static ServerJMSMessage createMessage(IDGenerator idGenerator) {
+      return new ServerJMSMessage(newMessage(idGenerator, DEFAULT_TYPE), 0);
+   }
+
+   public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator) {
+      return new ServerJMSTextMessage(newMessage(idGenerator, TEXT_TYPE), 0);
+   }
+
+   public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator, String text) throws JMSException {
+      ServerJMSTextMessage message = createTextMessage(idGenerator);
+      message.setText(text);
+      return message;
+   }
+
+   public static ServerJMSObjectMessage createObjectMessage(IDGenerator idGenerator) {
+      return new ServerJMSObjectMessage(newMessage(idGenerator, OBJECT_TYPE), 0);
+   }
+
+   public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, Binary serializedForm) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(idGenerator);
+      message.setSerializedForm(serializedForm);
+      return message;
+   }
+
+   public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, byte[] array, int offset, int length) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(idGenerator);
+      message.setSerializedForm(new Binary(array, offset, length));
+      return message;
+   }
+
+   public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator) {
+      return new ServerJMSMapMessage(newMessage(idGenerator, MAP_TYPE), 0);
+   }
+
+   public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator, Map<String, Object> content) throws JMSException {
+      ServerJMSMapMessage message = createMapMessage(idGenerator);
+      final Set<Map.Entry<String, Object>> set = content.entrySet();
+      for (Map.Entry<String, Object> entry : set) {
+         Object value = entry.getValue();
+         if (value instanceof Binary) {
+            Binary binary = (Binary) value;
+            value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength());
+         }
+         message.setObject(entry.getKey(), value);
+      }
+      return message;
+   }
+
+   private static ServerMessageImpl newMessage(IDGenerator idGenerator, byte messageType) {
+      ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512);
+      message.setType(messageType);
+      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
+      return message;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
index 9b0635a..70c755a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
@@ -16,8 +16,12 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
+@Deprecated
 public class AMQPMessageTypes {
 
+   // TODO - Remove in future release as these are no longer used by the
+   //        inbound JMS Transformer.
+
    public static final String AMQP_TYPE_KEY = "amqp:type";
 
    public static final String AMQP_SEQUENCE = "amqp:sequence";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
index 8a5d17c..7028547 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,12 +16,13 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.Message;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.utils.IDGenerator;
 
 public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
-   public AMQPNativeInboundTransformer(JMSVendor vendor) {
-      super(vendor);
+   public AMQPNativeInboundTransformer(IDGenerator idGenerator) {
+      super(idGenerator);
    }
 
    @Override
@@ -31,16 +32,13 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
 
    @Override
    public InboundTransformer getFallbackTransformer() {
-      return new AMQPRawInboundTransformer(getVendor());
+      return new AMQPRawInboundTransformer(idGenerator);
    }
 
    @Override
-   public Message transform(EncodedMessage amqpMessage) throws Exception {
+   public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception {
       org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 
-      Message rc = super.transform(amqpMessage);
-
-      populateMessage(rc, amqp);
-      return rc;
+      return populateMessage(super.transform(amqpMessage), amqp);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
index ac18a94..8e89bb3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,45 +16,65 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.BytesMessage;
+import java.io.UnsupportedEncodingException;
+
 import javax.jms.JMSException;
 
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.ProtonJMessage;
 
 public class AMQPNativeOutboundTransformer extends OutboundTransformer {
 
-   public AMQPNativeOutboundTransformer(JMSVendor vendor) {
-      super(vendor);
+   public AMQPNativeOutboundTransformer(IDGenerator idGenerator) {
+      super(idGenerator);
    }
 
-   public static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
-      byte[] data = new byte[(int) msg.getBodyLength()];
-      msg.readBytes(data);
-      msg.reset();
-      int count = msg.getIntProperty("JMSXDeliveryCount");
-
-      // decode...
-      ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
-      int offset = 0;
-      int len = data.length;
-      while (len > 0) {
-         final int decoded = amqp.decode(data, offset, len);
-         assert decoded > 0 : "Make progress decoding the message";
-         offset += decoded;
-         len -= decoded;
+   @Override
+   public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException {
+      if (message == null || !(message instanceof ServerJMSBytesMessage)) {
+         return 0;
       }
 
-      // Update the DeliveryCount header...
+      return transform(this, (ServerJMSBytesMessage) message, buffer);
+   }
+
+   public static long transform(OutboundTransformer options, ServerJMSBytesMessage message, WritableBuffer buffer) throws JMSException {
+      byte[] data = new byte[(int) message.getBodyLength()];
+      message.readBytes(data);
+      message.reset();
+
       // The AMQP delivery-count field only includes prior failed delivery attempts,
-      // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
-      if (amqp.getHeader() == null) {
-         amqp.setHeader(new Header());
-      }
+      int amqpDeliveryCount = message.getDeliveryCount() - 1;
+      if (amqpDeliveryCount >= 1) {
 
-      amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+         // decode...
+         ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+         int offset = 0;
+         int len = data.length;
+         while (len > 0) {
+            final int decoded = amqp.decode(data, offset, len);
+            assert decoded > 0 : "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+         }
+
+         // Update the DeliveryCount header which might require adding a Header
+         if (amqp.getHeader() == null && amqpDeliveryCount > 0) {
+            amqp.setHeader(new Header());
+         }
+
+         amqp.getHeader().setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
+
+         amqp.encode(buffer);
+      } else {
+         buffer.put(data, 0, data.length);
+      }
 
-      return amqp;
+      return 0;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
index e6bf171..445eaca 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,14 +16,21 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.BytesMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Message;
 
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.utils.IDGenerator;
+
 public class AMQPRawInboundTransformer extends InboundTransformer {
 
-   public AMQPRawInboundTransformer(JMSVendor vendor) {
-      super(vendor);
+   public AMQPRawInboundTransformer(IDGenerator idGenerator) {
+      super(idGenerator);
    }
 
    @Override
@@ -37,24 +44,19 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
    }
 
    @Override
-   public Message transform(EncodedMessage amqpMessage) throws Exception {
-      BytesMessage rc = vendor.createBytesMessage();
-      rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+   public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception {
+      ServerJMSBytesMessage message = createBytesMessage(idGenerator);
+      message.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
 
       // We cannot decode the message headers to check so err on the side of caution
       // and mark all messages as persistent.
-      rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-      rc.setJMSPriority(defaultPriority);
-
-      final long now = System.currentTimeMillis();
-      rc.setJMSTimestamp(now);
-      if (defaultTtl > 0) {
-         rc.setJMSExpiration(now + defaultTtl);
-      }
+      message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+      message.setJMSPriority(Message.DEFAULT_PRIORITY);
+      message.setJMSTimestamp(System.currentTimeMillis());
 
-      rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-      rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+      message.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
+      message.setBooleanProperty(JMS_AMQP_NATIVE, true);
 
-      return rc;
+      return message;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
index 4a80ea6..22042da 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
index ff0c035..5094af5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
@@ -16,13 +16,26 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Set;
 
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
 import org.apache.qpid.proton.amqp.Decimal32;
@@ -38,109 +51,61 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 
-import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
-
 public abstract class InboundTransformer {
 
-   JMSVendor vendor;
+   protected IDGenerator idGenerator;
 
    public static final String TRANSFORMER_NATIVE = "native";
    public static final String TRANSFORMER_RAW = "raw";
    public static final String TRANSFORMER_JMS = "jms";
 
-   String prefixVendor = "JMS_AMQP_";
-   String prefixDeliveryAnnotations = "DA_";
-   String prefixMessageAnnotations = "MA_";
-   String prefixFooter = "FT_";
-
-   int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
-   int defaultPriority = Message.DEFAULT_PRIORITY;
-   long defaultTtl = Message.DEFAULT_TIME_TO_LIVE;
-
-   public InboundTransformer(JMSVendor vendor) {
-      this.vendor = vendor;
+   public InboundTransformer(IDGenerator idGenerator) {
+      this.idGenerator = idGenerator;
    }
 
-   public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
+   public abstract ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception;
 
    public abstract String getTransformerName();
 
    public abstract InboundTransformer getFallbackTransformer();
 
-   public int getDefaultDeliveryMode() {
-      return defaultDeliveryMode;
-   }
-
-   public void setDefaultDeliveryMode(int defaultDeliveryMode) {
-      this.defaultDeliveryMode = defaultDeliveryMode;
-   }
-
-   public int getDefaultPriority() {
-      return defaultPriority;
-   }
-
-   public void setDefaultPriority(int defaultPriority) {
-      this.defaultPriority = defaultPriority;
-   }
-
-   public long getDefaultTtl() {
-      return defaultTtl;
-   }
-
-   public void setDefaultTtl(long defaultTtl) {
-      this.defaultTtl = defaultTtl;
-   }
-
-   public String getPrefixVendor() {
-      return prefixVendor;
-   }
-
-   public void setPrefixVendor(String prefixVendor) {
-      this.prefixVendor = prefixVendor;
-   }
-
-   public JMSVendor getVendor() {
-      return vendor;
-   }
-
-   public void setVendor(JMSVendor vendor) {
-      this.vendor = vendor;
-   }
-
-   protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+   @SuppressWarnings("unchecked")
+   protected ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
       Header header = amqp.getHeader();
-      if (header == null) {
-         header = new Header();
-      }
+      if (header != null) {
+         jms.setBooleanProperty(JMS_AMQP_HEADER, true);
 
-      if (header.getDurable() != null) {
-         jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-      } else {
-         jms.setJMSDeliveryMode(defaultDeliveryMode);
-      }
+         if (header.getDurable() != null) {
+            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         } else {
+            jms.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+         }
 
-      if (header.getPriority() != null) {
-         jms.setJMSPriority(header.getPriority().intValue());
-      } else {
-         jms.setJMSPriority(defaultPriority);
-      }
+         if (header.getPriority() != null) {
+            jms.setJMSPriority(header.getPriority().intValue());
+         } else {
+            jms.setJMSPriority(Message.DEFAULT_PRIORITY);
+         }
 
-      if (header.getFirstAcquirer() != null) {
-         jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
-      }
+         if (header.getFirstAcquirer() != null) {
+            jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
+         }
 
-      if (header.getDeliveryCount() != null) {
-         vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+         if (header.getDeliveryCount() != null) {
+            // AMQP Delivery Count counts only failed delivers where JMS
+            // Delivery Count should include the original delivery in the count.
+            jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1);
+         }
+      } else {
+         jms.setJMSPriority((byte) Message.DEFAULT_PRIORITY);
+         jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
       }
 
       final MessageAnnotations ma = amqp.getMessageAnnotations();
       if (ma != null) {
          for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
             String key = entry.getKey().toString();
-            if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
-               // Legacy annotation, JMSType value will be replaced by Subject further down if also present.
-               jms.setJMSType(entry.getValue().toString());
-            } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+            if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
                long deliveryTime = ((Number) entry.getValue()).longValue();
                jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
             } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
@@ -149,41 +114,15 @@ public abstract class InboundTransformer {
                   jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
                }
             }
-            //todo
-               /*else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) {
-                    int repeat = ((Number) entry.getValue()).intValue();
-                    if (repeat > 0) {
-                        jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
-                    }
-                } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) {
-                    long period = ((Number) entry.getValue()).longValue();
-                    if (period > 0) {
-                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
-                    }
-                } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) {
-                    String cronEntry = (String) entry.getValue();
-                    if (cronEntry != null) {
-                        jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
-                    }
-                }*/
 
-            setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+            setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
          }
       }
 
       final ApplicationProperties ap = amqp.getApplicationProperties();
       if (ap != null) {
          for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
-            String key = entry.getKey().toString();
-            if ("JMSXGroupID".equals(key)) {
-               vendor.setJMSXGroupID(jms, entry.getValue().toString());
-            } else if ("JMSXGroupSequence".equals(key)) {
-               vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
-            } else if ("JMSXUserID".equals(key)) {
-               vendor.setJMSXUserID(jms, entry.getValue().toString());
-            } else {
-               setProperty(jms, key, entry.getValue());
-            }
+            setProperty(jms, entry.getKey().toString(), entry.getValue());
          }
       }
 
@@ -194,37 +133,38 @@ public abstract class InboundTransformer {
          }
          Binary userId = properties.getUserId();
          if (userId != null) {
-            vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
+            // TODO - Better Way to set this?
+            jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
          }
          if (properties.getTo() != null) {
-            jms.setJMSDestination(vendor.createDestination(properties.getTo()));
+            jms.setJMSDestination(new ServerDestination(properties.getTo()));
          }
          if (properties.getSubject() != null) {
             jms.setJMSType(properties.getSubject());
          }
          if (properties.getReplyTo() != null) {
-            jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+            jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
          }
          if (properties.getCorrelationId() != null) {
             jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
          }
          if (properties.getContentType() != null) {
-            jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+            jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
          }
          if (properties.getContentEncoding() != null) {
-            jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+            jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
          }
          if (properties.getCreationTime() != null) {
             jms.setJMSTimestamp(properties.getCreationTime().getTime());
          }
          if (properties.getGroupId() != null) {
-            vendor.setJMSXGroupID(jms, properties.getGroupId());
+            jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId());
          }
          if (properties.getGroupSequence() != null) {
-            vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+            jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue());
          }
          if (properties.getReplyToGroupId() != null) {
-            jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+            jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
          }
          if (properties.getAbsoluteExpiryTime() != null) {
             jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
@@ -232,9 +172,9 @@ public abstract class InboundTransformer {
       }
 
       // If the jms expiration has not yet been set...
-      if (jms.getJMSExpiration() == 0) {
+      if (header != null && jms.getJMSExpiration() == 0) {
          // Then lets try to set it based on the message ttl.
-         long ttl = defaultTtl;
+         long ttl = Message.DEFAULT_TIME_TO_LIVE;
          if (header.getTtl() != null) {
             ttl = header.getTtl().longValue();
          }
@@ -250,9 +190,11 @@ public abstract class InboundTransformer {
       if (fp != null) {
          for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
             String key = entry.getKey().toString();
-            setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+            setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
          }
       }
+
+      return jms;
    }
 
    private void setProperty(Message msg, String key, Object value) throws JMSException {


Mime
View raw message