activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [4/5] activemq-artemis git commit: Fixing converters part I
Date Fri, 03 Mar 2017 00:55:25 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
new file mode 100644
index 0000000..44aff5b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageIdHelper;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Decimal128;
+import org.apache.qpid.proton.amqp.Decimal32;
+import org.apache.qpid.proton.amqp.Decimal64;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType;
+
+/**
+ *  This class was created just to separate concerns on AMQPConverter.
+ *  For better organization of the code.
+ * */
+public class AmqpCoreConverter {
+
+   public static ICoreMessage toCore(AMQPMessage message) throws Exception {
+
+      Section body = message.getProtonMessage().getBody();
+      ServerJMSMessage result;
+
+      if (body == null) {
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message.getProtonMessage())) {
+            result = createObjectMessage(message.getMessageID());
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
+            result = createBytesMessage(message.getMessageID());
+         } else {
+            Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+            if (charset != null) {
+               result = createTextMessage(message.getMessageID());
+            } else {
+               result = createMessage(message.getMessageID());
+            }
+         }
+
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
+      } else if (body instanceof Data) {
+         Binary payload = ((Data) body).getValue();
+
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message.getProtonMessage())) {
+            result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
+            result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+         } else {
+            Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+            if (StandardCharsets.UTF_8.equals(charset)) {
+               ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+
+               try {
+                  CharBuffer chars = charset.newDecoder().decode(buf);
+                  result = createTextMessage(message.getMessageID(), String.valueOf(chars));
+               } catch (CharacterCodingException e) {
+                  result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+               }
+            } else {
+               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            }
+         }
+
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
+      } else if (body instanceof AmqpSequence) {
+         AmqpSequence sequence = (AmqpSequence) body;
+         ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+         for (Object item : sequence.getValue()) {
+            m.writeObject(item);
+         }
+
+         result = m;
+         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
+      } else if (body instanceof AmqpValue) {
+         Object value = ((AmqpValue) body).getValue();
+         if (value == null || value instanceof String) {
+            result = createTextMessage(message.getMessageID(), (String) value);
+
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
+         } else if (value instanceof Binary) {
+            Binary payload = (Binary) value;
+
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message.getProtonMessage())) {
+               result = createObjectMessage(message.getMessageID(), payload);
+            } else {
+               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            }
+
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
+         } else if (value instanceof List) {
+            ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+            for (Object item : (List<Object>) value) {
+               m.writeObject(item);
+            }
+            result = m;
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
+         } else if (value instanceof Map) {
+            result = createMapMessage(message.getMessageID(), (Map<String, Object>) value);
+            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
+         } else {
+            ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+            try {
+               TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
+               TLSEncode.getEncoder().writeObject(body);
+               result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex());
+            } finally {
+               buf.release();
+               TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
+            }
+         }
+      } else {
+         throw new RuntimeException("Unexpected body type: " + body.getClass());
+      }
+
+      populateMessage(result, message.getProtonMessage());
+
+      return result != null ? result.getInnerMessage() : null;
+   }
+
+   protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+      Header header = amqp.getHeader();
+      if (header != null) {
+         jms.setBooleanProperty(JMS_AMQP_HEADER, true);
+
+         if (header.getDurable() != null) {
+            jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true);
+            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         } else {
+            jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         }
+
+         if (header.getPriority() != null) {
+            jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true);
+            jms.setJMSPriority(header.getPriority().intValue());
+         } else {
+            jms.setJMSPriority(javax.jms.Message.DEFAULT_PRIORITY);
+         }
+
+         if (header.getFirstAcquirer() != null) {
+            jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
+         }
+
+         if (header.getDeliveryCount() != null) {
+            // AMQP Delivery Count counts only failed delivers where JMS
+            // Delivery Count should include the original delivery in the count.
+            jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1);
+         }
+      } else {
+         jms.setJMSPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+         jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      }
+
+      final MessageAnnotations ma = amqp.getMessageAnnotations();
+      if (ma != null) {
+         for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+               long deliveryTime = ((Number) entry.getValue()).longValue();
+               jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
+            } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
+               long delay = ((Number) entry.getValue()).longValue();
+               if (delay > 0) {
+                  jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
+               }
+            }
+
+            setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
+         }
+      }
+
+      final ApplicationProperties ap = amqp.getApplicationProperties();
+      if (ap != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
+            setProperty(jms, entry.getKey().toString(), entry.getValue());
+         }
+      }
+
+      final Properties properties = amqp.getProperties();
+      if (properties != null) {
+         if (properties.getMessageId() != null) {
+            jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
+         }
+         Binary userId = properties.getUserId();
+         if (userId != null) {
+            // TODO - Better Way to set this?
+            jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
+         }
+         if (properties.getTo() != null) {
+            jms.setJMSDestination(new ServerDestination(properties.getTo()));
+         }
+         if (properties.getSubject() != null) {
+            jms.setJMSType(properties.getSubject());
+         }
+         if (properties.getReplyTo() != null) {
+            jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
+         }
+         if (properties.getCorrelationId() != null) {
+            jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
+         }
+         if (properties.getContentType() != null) {
+            jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
+         }
+         if (properties.getContentEncoding() != null) {
+            jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
+         }
+         if (properties.getCreationTime() != null) {
+            jms.setJMSTimestamp(properties.getCreationTime().getTime());
+         }
+         if (properties.getGroupId() != null) {
+            jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId());
+         }
+         if (properties.getGroupSequence() != null) {
+            jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue());
+         }
+         if (properties.getReplyToGroupId() != null) {
+            jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
+         }
+         if (properties.getAbsoluteExpiryTime() != null) {
+            jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+         }
+      }
+
+      // If the jms expiration has not yet been set...
+      if (header != null && jms.getJMSExpiration() == 0) {
+         // Then lets try to set it based on the message ttl.
+         long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+         if (header.getTtl() != null) {
+            ttl = header.getTtl().longValue();
+         }
+
+         if (ttl == 0) {
+            jms.setJMSExpiration(0);
+         } else {
+            jms.setJMSExpiration(System.currentTimeMillis() + ttl);
+         }
+      }
+
+      final Footer fp = amqp.getFooter();
+      if (fp != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
+         }
+      }
+
+      return jms;
+   }
+
+   private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException {
+      if (value instanceof UnsignedLong) {
+         long v = ((UnsignedLong) value).longValue();
+         msg.setLongProperty(key, v);
+      } else if (value instanceof UnsignedInteger) {
+         long v = ((UnsignedInteger) value).longValue();
+         if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
+            msg.setIntProperty(key, (int) v);
+         } else {
+            msg.setLongProperty(key, v);
+         }
+      } else if (value instanceof UnsignedShort) {
+         int v = ((UnsignedShort) value).intValue();
+         if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
+            msg.setShortProperty(key, (short) v);
+         } else {
+            msg.setIntProperty(key, v);
+         }
+      } else if (value instanceof UnsignedByte) {
+         short v = ((UnsignedByte) value).shortValue();
+         if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
+            msg.setByteProperty(key, (byte) v);
+         } else {
+            msg.setShortProperty(key, v);
+         }
+      } else if (value instanceof Symbol) {
+         msg.setStringProperty(key, value.toString());
+      } else if (value instanceof Decimal128) {
+         msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
+      } else if (value instanceof Decimal64) {
+         msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
+      } else if (value instanceof Decimal32) {
+         msg.setFloatProperty(key, ((Decimal32) value).floatValue());
+      } else if (value instanceof Binary) {
+         msg.setStringProperty(key, value.toString());
+      } else {
+         msg.setObjectProperty(key, value);
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
new file mode 100644
index 0000000..c29ec9f
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageIdHelper;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.QUEUE_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TEMP_QUEUE_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TEMP_TOPIC_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.TOPIC_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress;
+
+public class CoreAmqpConverter {
+
+   private static Logger logger = Logger.getLogger(CoreAmqpConverter.class);
+
+   public static AMQPMessage checkAMQP(Message message) throws Exception {
+      if (message instanceof AMQPMessage) {
+         return (AMQPMessage)message;
+      } else {
+         // It will first convert to Core, then to AMQP
+         return fromCore(message.toCore());
+      }
+   }
+
+   public static AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception {
+      if (coreMessage == null) {
+         return null;
+      }
+
+      ServerJMSMessage message = ServerJMSMessage.wrapCoreMessage(coreMessage);
+      message.decode();
+
+      long messageFormat = 0;
+      Header header = null;
+      Properties properties = null;
+      Map<Symbol, Object> daMap = null;
+      Map<Symbol, Object> maMap = null;
+      Map<String, Object> apMap = null;
+      Map<Object, Object> footerMap = null;
+
+      Section body = convertBody(message);
+
+      if (message.getInnerMessage().isDurable()) {
+         if (header == null) {
+            header = new Header();
+         }
+         header.setDurable(true);
+      }
+      byte priority = (byte) message.getJMSPriority();
+      if (priority != javax.jms.Message.DEFAULT_PRIORITY) {
+         if (header == null) {
+            header = new Header();
+         }
+         header.setPriority(UnsignedByte.valueOf(priority));
+      }
+      String type = message.getJMSType();
+      if (type != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setSubject(type);
+      }
+      String messageId = message.getJMSMessageID();
+      if (messageId != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         try {
+            properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId));
+         } catch (ActiveMQAMQPIllegalStateException e) {
+            properties.setMessageId(messageId);
+         }
+      }
+      Destination destination = message.getJMSDestination();
+      if (destination != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setTo(toAddress(destination));
+         if (maMap == null) {
+            maMap = new HashMap<>();
+         }
+         maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
+      }
+      Destination replyTo = message.getJMSReplyTo();
+      if (replyTo != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setReplyTo(toAddress(replyTo));
+         if (maMap == null) {
+            maMap = new HashMap<>();
+         }
+         maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
+      }
+      String correlationId = message.getJMSCorrelationID();
+      if (correlationId != null) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         try {
+            properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+         } catch (ActiveMQAMQPIllegalStateException e) {
+            properties.setCorrelationId(correlationId);
+         }
+      }
+      long expiration = message.getJMSExpiration();
+      if (expiration != 0) {
+         long ttl = expiration - System.currentTimeMillis();
+         if (ttl < 0) {
+            ttl = 1;
+         }
+
+         if (header == null) {
+            header = new Header();
+         }
+         header.setTtl(new UnsignedInteger((int) ttl));
+
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setAbsoluteExpiryTime(new Date(expiration));
+      }
+      long timeStamp = message.getJMSTimestamp();
+      if (timeStamp != 0) {
+         if (properties == null) {
+            properties = new Properties();
+         }
+         properties.setCreationTime(new Date(timeStamp));
+      }
+
+      final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage());
+      for (String key : keySet) {
+         if (key.startsWith("JMSX")) {
+            if (key.equals("JMSXUserID")) {
+               String value = message.getStringProperty(key);
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
+               continue;
+            } else if (key.equals("JMSXGroupID")) {
+               String value = message.getStringProperty(key);
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setGroupId(value);
+               continue;
+            } else if (key.equals("JMSXGroupSeq")) {
+               UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setGroupSequence(value);
+               continue;
+            }
+         } else if (key.startsWith(JMS_AMQP_PREFIX)) {
+            // AMQP Message Information stored from a conversion to the Core Message
+            if (key.equals(JMS_AMQP_NATIVE)) {
+               // skip..internal use only
+               continue;
+            } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
+               // skip..internal use only
+               continue;
+            } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               header.setFirstAcquirer(message.getBooleanProperty(key));
+               continue;
+            } else if (key.equals(JMS_AMQP_HEADER)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               continue;
+            } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               header.setDurable(message.getInnerMessage().isDurable());
+               continue;
+            } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) {
+               if (header == null) {
+                  header = new Header();
+               }
+               header.setPriority(UnsignedByte.valueOf(priority));
+               continue;
+            } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               continue;
+            } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) {
+               if (daMap == null) {
+                  daMap = new HashMap<>();
+               }
+               String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
+               daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+               continue;
+            } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
+               if (maMap == null) {
+                  maMap = new HashMap<>();
+               }
+               String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
+               maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+               continue;
+            } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
+               continue;
+            } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
+               continue;
+            } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
+               if (properties == null) {
+                  properties = new Properties();
+               }
+               properties.setReplyToGroupId(message.getStringProperty(key));
+               continue;
+            } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
+               if (footerMap == null) {
+                  footerMap = new HashMap<>();
+               }
+               String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
+               footerMap.put(name, message.getObjectProperty(key));
+               continue;
+            }
+         } else if (key.equals("_AMQ_GROUP_ID")) {
+            String value = message.getStringProperty(key);
+            if (properties == null) {
+               properties = new Properties();
+            }
+            properties.setGroupId(value);
+            continue;
+         } else if (key.equals(NATIVE_MESSAGE_ID)) {
+            // skip..internal use only
+            continue;
+         } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
+            // skip..remove annotation from previous inbound transformation
+            continue;
+         }
+
+         if (apMap == null) {
+            apMap = new HashMap<>();
+         }
+
+         Object objectProperty = message.getObjectProperty(key);
+         if (objectProperty instanceof byte[]) {
+            objectProperty = new Binary((byte[]) objectProperty);
+         }
+
+         apMap.put(key, objectProperty);
+      }
+
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      try {
+         EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+
+         if (header != null) {
+            encoder.writeObject(header);
+         }
+         if (daMap != null) {
+            encoder.writeObject(new DeliveryAnnotations(daMap));
+         }
+         if (maMap != null) {
+            encoder.writeObject(new MessageAnnotations(maMap));
+         }
+         if (properties != null) {
+            encoder.writeObject(properties);
+         }
+         if (apMap != null) {
+            encoder.writeObject(new ApplicationProperties(apMap));
+         }
+         if (body != null) {
+            encoder.writeObject(body);
+         }
+         if (footerMap != null) {
+            encoder.writeObject(new Footer(footerMap));
+         }
+
+         byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPMessage(messageFormat, data);
+
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   private static Section convertBody(ServerJMSMessage message) throws JMSException {
+
+      Section body = null;
+      short orignalEncoding = AMQP_UNKNOWN;
+
+      try {
+         orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
+      } catch (Exception ex) {
+         // Ignore and stick with UNKNOWN
+      }
+
+      if (message instanceof ServerJMSBytesMessage) {
+         Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message);
+
+         if (payload == null) {
+            payload = EMPTY_BINARY;
+         }
+
+         switch (orignalEncoding) {
+            case AMQP_NULL:
+               break;
+            case AMQP_VALUE_BINARY:
+               body = new AmqpValue(payload);
+               break;
+            case AMQP_DATA:
+            case AMQP_UNKNOWN:
+            default:
+               body = new Data(payload);
+               break;
+         }
+      } else if (message instanceof ServerJMSTextMessage) {
+         switch (orignalEncoding) {
+            case AMQP_NULL:
+               break;
+            case AMQP_DATA:
+               body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message));
+               break;
+            case AMQP_VALUE_STRING:
+            case AMQP_UNKNOWN:
+            default:
+               body = new AmqpValue(((TextMessage) message).getText());
+               break;
+         }
+      } else if (message instanceof ServerJMSMapMessage) {
+         body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message));
+      } else if (message instanceof ServerJMSStreamMessage) {
+         ArrayList<Object> list = new ArrayList<>();
+         final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message;
+         try {
+            while (true) {
+               list.add(m.readObject());
+            }
+         } catch (MessageEOFException e) {
+         }
+
+         switch (orignalEncoding) {
+            case AMQP_SEQUENCE:
+               body = new AmqpSequence(list);
+               break;
+            case AMQP_VALUE_LIST:
+            case AMQP_UNKNOWN:
+            default:
+               body = new AmqpValue(list);
+               break;
+         }
+      } else if (message instanceof ServerJMSObjectMessage) {
+         Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message);
+
+         if (payload == null) {
+            payload = EMPTY_BINARY;
+         }
+
+         switch (orignalEncoding) {
+            case AMQP_VALUE_BINARY:
+               body = new AmqpValue(payload);
+               break;
+            case AMQP_DATA:
+            case AMQP_UNKNOWN:
+            default:
+               body = new Data(payload);
+               break;
+         }
+
+         // For a non-AMQP message we tag the outbound content type as containing
+         // a serialized Java object so that an AMQP client has a hint as to what
+         // we are sending it.
+         if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
+            message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+         }
+      } else if (message instanceof ServerJMSMessage) {
+         // If this is not an AMQP message that was converted then the original encoding
+         // will be unknown so we check for special cases of messages with special data
+         // encoded into the server message body.
+         if (orignalEncoding == AMQP_UNKNOWN) {
+            ICoreMessage internalMessage = message.getInnerMessage();
+            int readerIndex = internalMessage.getBodyBuffer().readerIndex();
+            try {
+               Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
+               if (s != null) {
+                  body = new AmqpValue(s.toString());
+               }
+            } catch (Throwable ignored) {
+               logger.debug("Exception ignored during conversion", ignored.getMessage(), ignored);
+            } finally {
+               internalMessage.getBodyBuffer().readerIndex(readerIndex);
+            }
+         }
+      }
+
+      return body;
+   }
+
+   private static Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException {
+      byte[] data = new byte[(int) message.getBodyLength()];
+      message.readBytes(data);
+      message.reset(); // Need to reset after readBytes or future readBytes
+
+      return new Binary(data);
+   }
+
+   private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException {
+      Binary result = null;
+      String text = message.getText();
+      if (text != null) {
+         result = new Binary(text.getBytes(StandardCharsets.UTF_8));
+      }
+
+      return result;
+   }
+
+   private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
+      message.getInnerMessage().getBodyBuffer().resetReaderIndex();
+      int size = message.getInnerMessage().getBodyBuffer().readInt();
+      byte[] bytes = new byte[size];
+      message.getInnerMessage().getBodyBuffer().readBytes(bytes);
+
+      return new Binary(bytes);
+   }
+
+   private static Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException {
+      final HashMap<String, Object> map = new LinkedHashMap<>();
+
+      @SuppressWarnings("unchecked")
+      final Enumeration<String> names = message.getMapNames();
+      while (names.hasMoreElements()) {
+         String key = names.nextElement();
+         Object value = message.getObject(key);
+         if (value instanceof byte[]) {
+            value = new Binary((byte[]) value);
+         }
+         map.put(key, value);
+      }
+
+      return map;
+   }
+
+   private static byte destinationType(Destination destination) {
+      if (destination instanceof Queue) {
+         if (destination instanceof TemporaryQueue) {
+            return TEMP_QUEUE_TYPE;
+         } else {
+            return QUEUE_TYPE;
+         }
+      } else if (destination instanceof Topic) {
+         if (destination instanceof TemporaryTopic) {
+            return TEMP_TOPIC_TYPE;
+         } else {
+            return TOPIC_TYPE;
+         }
+      }
+
+      throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
deleted file mode 100644
index 125a20f..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter;
-
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
-
-import java.io.IOException;
-
-import javax.jms.BytesMessage;
-
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.codec.WritableBuffer;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-public class ProtonMessageConverter implements MessageConverter {
-
-   public ProtonMessageConverter(IDGenerator idGenerator) {
-      inboundTransformer = new JMSMappingInboundTransformer(idGenerator);
-      outboundTransformer = new JMSMappingOutboundTransformer(idGenerator);
-   }
-
-   private final InboundTransformer inboundTransformer;
-   private final OutboundTransformer outboundTransformer;
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message inbound(Object messageSource) throws Exception {
-      AMQPMessage encodedMessageSource = (AMQPMessage) messageSource;
-      ServerJMSMessage transformedMessage = null;
-
-      try {
-         transformedMessage = inboundTransformer.transform(encodedMessageSource);
-      } catch (Exception e) {
-         ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
-         ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
-
-         throw new IOException("Failed to transform incoming delivery, skipping.");
-      }
-
-      transformedMessage.encode();
-
-      return transformedMessage.getInnerMessage();
-   }
-
-   @Override
-   public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount) throws Exception {
-      // Useful for testing but not recommended for real life use.
-      ByteBuf nettyBuffer = Unpooled.buffer(1024);
-      NettyWritable buffer = new NettyWritable(nettyBuffer);
-      long messageFormat = (long) outbound(messageOutbound, deliveryCount, buffer);
-
-      EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(),
-         nettyBuffer.readableBytes());
-
-      return encoded;
-   }
-
-   public Object outbound(org.apache.activemq.artemis.api.core.Message messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception {
-      ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
-
-      jmsMessage.decode();
-
-      if (jmsMessage.getBooleanProperty(JMS_AMQP_NATIVE)) {
-         if (jmsMessage instanceof BytesMessage) {
-            return AMQPNativeOutboundTransformer.transform(outboundTransformer, (ServerJMSBytesMessage) jmsMessage, buffer);
-         } else {
-            return 0;
-         }
-      } else {
-         return outboundTransformer.transform(jmsMessage, buffer);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
index c3a60f0..8d473a7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms;
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
@@ -49,8 +49,8 @@ import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF;
 
 public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage {
 
-   public ServerJMSBytesMessage(Message message, int deliveryCount) {
-      super(message, deliveryCount);
+   public ServerJMSBytesMessage(ICoreMessage message) {
+      super(message);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
index df79183..f72239e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.utils.TypedProperties;
@@ -51,8 +52,8 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
    /*
     * This constructor is used to construct messages prior to sending
     */
-   public ServerJMSMapMessage(Message message, int deliveryCount) {
-      super(message, deliveryCount);
+   public ServerJMSMapMessage(ICoreMessage message) {
+      super(message);
 
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index adf4621..2a52f7a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -25,34 +25,48 @@ import java.util.Enumeration;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.reader.MessageUtil;
 
 import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
 
 public class ServerJMSMessage implements Message {
 
-   protected final CoreMessage message;
-
-   protected int deliveryCount;
-
-   public org.apache.activemq.artemis.api.core.Message getInnerMessage() {
-      return message;
-   }
+   protected final ICoreMessage message;
+   private ActiveMQBuffer readBodyBuffer;
 
-   public ServerJMSMessage(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
-      this.message = (CoreMessage)message;
-      this.deliveryCount = deliveryCount;
+   public ServerJMSMessage(ICoreMessage message) {
+      this.message = message;
+   }
+
+   public static ServerJMSMessage wrapCoreMessage(ICoreMessage wrapped) {
+      switch (wrapped.getType()) {
+         case STREAM_TYPE:
+            return new ServerJMSStreamMessage(wrapped);
+         case BYTES_TYPE:
+            return new ServerJMSBytesMessage(wrapped);
+         case MAP_TYPE:
+            return new ServerJMSMapMessage(wrapped);
+         case TEXT_TYPE:
+            return new ServerJMSTextMessage(wrapped);
+         case OBJECT_TYPE:
+            return new ServerJMSObjectMessage(wrapped);
+         default:
+            return new ServerJMSMessage(wrapped);
+      }
    }
 
-   public int getDeliveryCount() {
-      return deliveryCount;
+   public ICoreMessage getInnerMessage() {
+      return message;
    }
 
-   private ActiveMQBuffer readBodyBuffer;
-
    /**
     * When reading we use a protected copy so multi-threads can work fine
     */
@@ -112,13 +126,13 @@ public class ServerJMSMessage implements Message {
    }
 
    @Override
-   public final void setJMSCorrelationID(String correlationID) throws JMSException {
-      MessageUtil.setJMSCorrelationID(message, correlationID);
+   public final String getJMSCorrelationID() throws JMSException {
+      return MessageUtil.getJMSCorrelationID(message);
    }
 
    @Override
-   public final String getJMSCorrelationID() throws JMSException {
-      return MessageUtil.getJMSCorrelationID(message);
+   public final void setJMSCorrelationID(String correlationID) throws JMSException {
+      MessageUtil.setJMSCorrelationID(message, correlationID);
    }
 
    @Override
@@ -253,19 +267,11 @@ public class ServerJMSMessage implements Message {
 
    @Override
    public final int getIntProperty(String name) throws JMSException {
-      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
-         return deliveryCount;
-      }
-
       return message.getIntProperty(name);
    }
 
    @Override
    public final long getLongProperty(String name) throws JMSException {
-      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
-         return deliveryCount;
-      }
-
       return message.getLongProperty(name);
    }
 
@@ -281,10 +287,6 @@ public class ServerJMSMessage implements Message {
 
    @Override
    public final String getStringProperty(String name) throws JMSException {
-      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
-         return String.valueOf(deliveryCount);
-      }
-
       return message.getStringProperty(name);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
index 15b04a9..23ffb09 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.jms;
 
-import java.io.Serializable;
-
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
+import java.io.Serializable;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.qpid.proton.amqp.Binary;
 
@@ -30,8 +30,8 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe
 
    private Binary payload;
 
-   public ServerJMSObjectMessage(Message message, int deliveryCount) {
-      super(message, deliveryCount);
+   public ServerJMSObjectMessage(ICoreMessage message) {
+      super(message);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
index b092e61..9aaf4c3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
@@ -21,6 +21,7 @@ import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
 import javax.jms.StreamMessage;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -43,8 +44,8 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
 
    private int bodyLength = 0;
 
-   public ServerJMSStreamMessage(Message message, int deliveryCount) {
-      super(message, deliveryCount);
+   public ServerJMSStreamMessage(ICoreMessage message) {
+      super(message);
    }
 
    // StreamMessage implementation ----------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
index 058a3e9..f770185 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms;
 import javax.jms.JMSException;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
@@ -48,8 +49,8 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
    /*
     * This constructor is used to construct messages prior to sending
     */
-   public ServerJMSTextMessage(Message message, int deliveryCount) {
-      super(message, deliveryCount);
+   public ServerJMSTextMessage(ICoreMessage message) {
+      super(message);
 
    }
    // TextMessage implementation ------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
index 0a39573..9583051 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMe
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
-import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -79,7 +78,6 @@ public final class AMQPMessageSupport {
    public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY;
    public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
    public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
-   public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;
    public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE;
    public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER;
    public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE;
@@ -103,6 +101,15 @@ public final class AMQPMessageSupport {
    public static final short AMQP_VALUE_MAP = 7;
    public static final short AMQP_VALUE_LIST = 8;
 
+   public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest");
+   public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to");
+
+   public static final byte QUEUE_TYPE = 0x00;
+   public static final byte TOPIC_TYPE = 0x01;
+   public static final byte TEMP_QUEUE_TYPE = 0x02;
+   public static final byte TEMP_TOPIC_TYPE = 0x03;
+
+
    /**
     * Content type used to mark Data sections as containing a serialized java object.
     */
@@ -179,23 +186,6 @@ public final class AMQPMessageSupport {
       }
    }
 
-   public static ServerJMSMessage wrapMessage(int messageType, org.apache.activemq.artemis.api.core.Message wrapped, int deliveryCount) {
-      switch (messageType) {
-         case STREAM_TYPE:
-            return new ServerJMSStreamMessage(wrapped, deliveryCount);
-         case BYTES_TYPE:
-            return new ServerJMSBytesMessage(wrapped, deliveryCount);
-         case MAP_TYPE:
-            return new ServerJMSMapMessage(wrapped, deliveryCount);
-         case TEXT_TYPE:
-            return new ServerJMSTextMessage(wrapped, deliveryCount);
-         case OBJECT_TYPE:
-            return new ServerJMSObjectMessage(wrapped, deliveryCount);
-         default:
-            return new ServerJMSMessage(wrapped, deliveryCount);
-      }
-   }
-
    public static String toAddress(Destination destination) {
       if (destination instanceof ActiveMQDestination) {
          return ((ActiveMQDestination) destination).getAddress();
@@ -203,56 +193,56 @@ public final class AMQPMessageSupport {
       return null;
    }
 
-   public static ServerJMSBytesMessage createBytesMessage(IDGenerator idGenerator) {
-      return new ServerJMSBytesMessage(newMessage(idGenerator, BYTES_TYPE), 0);
+   public static ServerJMSBytesMessage createBytesMessage(long id) {
+      return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE));
    }
 
-   public static ServerJMSMessage createBytesMessage(IDGenerator idGenerator, byte[] array, int arrayOffset, int length) throws JMSException {
-      ServerJMSBytesMessage message = createBytesMessage(idGenerator);
+   public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException {
+      ServerJMSBytesMessage message = createBytesMessage(id);
       message.writeBytes(array, arrayOffset, length);
       return message;
    }
 
-   public static ServerJMSStreamMessage createStreamMessage(IDGenerator idGenerator) {
-      return new ServerJMSStreamMessage(newMessage(idGenerator, STREAM_TYPE), 0);
+   public static ServerJMSStreamMessage createStreamMessage(long id) {
+      return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE));
    }
 
-   public static ServerJMSMessage createMessage(IDGenerator idGenerator) {
-      return new ServerJMSMessage(newMessage(idGenerator, DEFAULT_TYPE), 0);
+   public static ServerJMSMessage createMessage(long id) {
+      return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE));
    }
 
-   public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator) {
-      return new ServerJMSTextMessage(newMessage(idGenerator, TEXT_TYPE), 0);
+   public static ServerJMSTextMessage createTextMessage(long id) {
+      return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE));
    }
 
-   public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator, String text) throws JMSException {
-      ServerJMSTextMessage message = createTextMessage(idGenerator);
+   public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException {
+      ServerJMSTextMessage message = createTextMessage(id);
       message.setText(text);
       return message;
    }
 
-   public static ServerJMSObjectMessage createObjectMessage(IDGenerator idGenerator) {
-      return new ServerJMSObjectMessage(newMessage(idGenerator, OBJECT_TYPE), 0);
+   public static ServerJMSObjectMessage createObjectMessage(long id) {
+      return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE));
    }
 
-   public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, Binary serializedForm) throws JMSException {
-      ServerJMSObjectMessage message = createObjectMessage(idGenerator);
+   public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id);
       message.setSerializedForm(serializedForm);
       return message;
    }
 
-   public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, byte[] array, int offset, int length) throws JMSException {
-      ServerJMSObjectMessage message = createObjectMessage(idGenerator);
+   public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id);
       message.setSerializedForm(new Binary(array, offset, length));
       return message;
    }
 
-   public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator) {
-      return new ServerJMSMapMessage(newMessage(idGenerator, MAP_TYPE), 0);
+   public static ServerJMSMapMessage createMapMessage(long id) {
+      return new ServerJMSMapMessage(newMessage(id, MAP_TYPE));
    }
 
-   public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator, Map<String, Object> content) throws JMSException {
-      ServerJMSMapMessage message = createMapMessage(idGenerator);
+   public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException {
+      ServerJMSMapMessage message = createMapMessage(id);
       final Set<Map.Entry<String, Object>> set = content.entrySet();
       for (Map.Entry<String, Object> entry : set) {
          Object value = entry.getValue();
@@ -265,8 +255,8 @@ public final class AMQPMessageSupport {
       return message;
    }
 
-   private static CoreMessage newMessage(IDGenerator idGenerator, byte messageType) {
-      CoreMessage message = new CoreMessage(idGenerator.generateID(), 512);
+   private static CoreMessage newMessage(long id, byte messageType) {
+      CoreMessage message = new CoreMessage(id, 512);
       message.setType(messageType);
       ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
deleted file mode 100644
index 70c755a..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-@Deprecated
-public class AMQPMessageTypes {
-
-   // TODO - Remove in future release as these are no longer used by the
-   //        inbound JMS Transformer.
-
-   public static final String AMQP_TYPE_KEY = "amqp:type";
-
-   public static final String AMQP_SEQUENCE = "amqp:sequence";
-
-   public static final String AMQP_LIST = "amqp:list";
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
deleted file mode 100644
index 8e89bb3..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import java.io.UnsupportedEncodingException;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.message.ProtonJMessage;
-
-public class AMQPNativeOutboundTransformer extends OutboundTransformer {
-
-   public AMQPNativeOutboundTransformer(IDGenerator idGenerator) {
-      super(idGenerator);
-   }
-
-   @Override
-   public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException {
-      if (message == null || !(message instanceof ServerJMSBytesMessage)) {
-         return 0;
-      }
-
-      return transform(this, (ServerJMSBytesMessage) message, buffer);
-   }
-
-   public static long transform(OutboundTransformer options, ServerJMSBytesMessage message, WritableBuffer buffer) throws JMSException {
-      byte[] data = new byte[(int) message.getBodyLength()];
-      message.readBytes(data);
-      message.reset();
-
-      // The AMQP delivery-count field only includes prior failed delivery attempts,
-      int amqpDeliveryCount = message.getDeliveryCount() - 1;
-      if (amqpDeliveryCount >= 1) {
-
-         // decode...
-         ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
-         int offset = 0;
-         int len = data.length;
-         while (len > 0) {
-            final int decoded = amqp.decode(data, offset, len);
-            assert decoded > 0 : "Make progress decoding the message";
-            offset += decoded;
-            len -= decoded;
-         }
-
-         // Update the DeliveryCount header which might require adding a Header
-         if (amqp.getHeader() == null && amqpDeliveryCount > 0) {
-            amqp.setHeader(new Header());
-         }
-
-         amqp.getHeader().setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
-
-         amqp.encode(buffer);
-      } else {
-         buffer.put(data, 0, data.length);
-      }
-
-      return 0;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
deleted file mode 100644
index 22042da..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.message.Message;
-
-public class EncodedMessage {
-
-   private final Binary data;
-   final long messageFormat;
-
-   public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
-      this.data = new Binary(data, offset, length);
-      this.messageFormat = messageFormat;
-   }
-
-   public long getMessageFormat() {
-      return messageFormat;
-   }
-
-   public Message decode() throws Exception {
-      Message amqp = Message.Factory.create();
-
-      int offset = getArrayOffset();
-      int len = getLength();
-      while (len > 0) {
-         final int decoded = amqp.decode(getArray(), offset, len);
-         assert decoded > 0 : "Make progress decoding the message";
-         offset += decoded;
-         len -= decoded;
-      }
-
-      return amqp;
-   }
-
-   public int getLength() {
-      return data.getLength();
-   }
-
-   public int getArrayOffset() {
-      return data.getArrayOffset();
-   }
-
-   public byte[] getArray() {
-      return data.getArray();
-   }
-
-   @Override
-   public String toString() {
-      return data.toString();
-   }
-}


Mime
View raw message