activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [10/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation
Date Tue, 27 Sep 2016 13:54:37 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
new file mode 100644
index 0000000..55195eb
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Decimal128;
+import org.apache.qpid.proton.amqp.Decimal32;
+import org.apache.qpid.proton.amqp.Decimal64;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+
+public abstract class InboundTransformer {
+
+   JMSVendor vendor;
+
+   public static final String TRANSFORMER_NATIVE = "native";
+   public static final String TRANSFORMER_RAW = "raw";
+   public static final String TRANSFORMER_JMS = "jms";
+
+   String prefixVendor = "JMS_AMQP_";
+   String prefixDeliveryAnnotations = "DA_";
+   String prefixMessageAnnotations = "MA_";
+   String prefixFooter = "FT_";
+
+   int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
+   int defaultPriority = Message.DEFAULT_PRIORITY;
+   long defaultTtl = Message.DEFAULT_TIME_TO_LIVE;
+
+   public InboundTransformer(JMSVendor vendor) {
+      this.vendor = vendor;
+   }
+
+   public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
+
+   public abstract String getTransformerName();
+
+   public abstract InboundTransformer getFallbackTransformer();
+
+   public int getDefaultDeliveryMode() {
+      return defaultDeliveryMode;
+   }
+
+   public void setDefaultDeliveryMode(int defaultDeliveryMode) {
+      this.defaultDeliveryMode = defaultDeliveryMode;
+   }
+
+   public int getDefaultPriority() {
+      return defaultPriority;
+   }
+
+   public void setDefaultPriority(int defaultPriority) {
+      this.defaultPriority = defaultPriority;
+   }
+
+   public long getDefaultTtl() {
+      return defaultTtl;
+   }
+
+   public void setDefaultTtl(long defaultTtl) {
+      this.defaultTtl = defaultTtl;
+   }
+
+   public String getPrefixVendor() {
+      return prefixVendor;
+   }
+
+   public void setPrefixVendor(String prefixVendor) {
+      this.prefixVendor = prefixVendor;
+   }
+
+   public JMSVendor getVendor() {
+      return vendor;
+   }
+
+   public void setVendor(JMSVendor vendor) {
+      this.vendor = vendor;
+   }
+
+   protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+      Header header = amqp.getHeader();
+      if (header == null) {
+         header = new Header();
+      }
+
+      if (header.getDurable() != null) {
+         jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+      }
+      else {
+         jms.setJMSDeliveryMode(defaultDeliveryMode);
+      }
+
+      if (header.getPriority() != null) {
+         jms.setJMSPriority(header.getPriority().intValue());
+      }
+      else {
+         jms.setJMSPriority(defaultPriority);
+      }
+
+      if (header.getFirstAcquirer() != null) {
+         jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
+      }
+
+      if (header.getDeliveryCount() != null) {
+         vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
+      }
+
+      final MessageAnnotations ma = amqp.getMessageAnnotations();
+      if (ma != null) {
+         for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
+               // Legacy annotation, JMSType value will be replaced by Subject further down if also present.
+               jms.setJMSType(entry.getValue().toString());
+            }
+            else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+               long deliveryTime = ((Number) entry.getValue()).longValue();
+               jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
+            }
+            else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
+               long delay = ((Number) entry.getValue()).longValue();
+               if (delay > 0) {
+                  jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
+               }
+            }
+            //todo
+               /*else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) {
+                    int repeat = ((Number) entry.getValue()).intValue();
+                    if (repeat > 0) {
+                        jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
+                    }
+                } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) {
+                    long period = ((Number) entry.getValue()).longValue();
+                    if (period > 0) {
+                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
+                    }
+                } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) {
+                    String cronEntry = (String) entry.getValue();
+                    if (cronEntry != null) {
+                        jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
+                    }
+                }*/
+
+            setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+         }
+      }
+
+      final ApplicationProperties ap = amqp.getApplicationProperties();
+      if (ap != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("JMSXGroupID".equals(key)) {
+               vendor.setJMSXGroupID(jms, entry.getValue().toString());
+            }
+            else if ("JMSXGroupSequence".equals(key)) {
+               vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
+            }
+            else if ("JMSXUserID".equals(key)) {
+               vendor.setJMSXUserID(jms, entry.getValue().toString());
+            }
+            else {
+               setProperty(jms, key, entry.getValue());
+            }
+         }
+      }
+
+      final Properties properties = amqp.getProperties();
+      if (properties != null) {
+         if (properties.getMessageId() != null) {
+            jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
+         }
+         Binary userId = properties.getUserId();
+         if (userId != null) {
+            vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
+         }
+         if (properties.getTo() != null) {
+            jms.setJMSDestination(vendor.createDestination(properties.getTo()));
+         }
+         if (properties.getSubject() != null) {
+            jms.setJMSType(properties.getSubject());
+         }
+         if (properties.getReplyTo() != null) {
+            jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+         }
+         if (properties.getCorrelationId() != null) {
+            jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
+         }
+         if (properties.getContentType() != null) {
+            jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+         }
+         if (properties.getContentEncoding() != null) {
+            jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+         }
+         if (properties.getCreationTime() != null) {
+            jms.setJMSTimestamp(properties.getCreationTime().getTime());
+         }
+         if (properties.getGroupId() != null) {
+            vendor.setJMSXGroupID(jms, properties.getGroupId());
+         }
+         if (properties.getGroupSequence() != null) {
+            vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
+         }
+         if (properties.getReplyToGroupId() != null) {
+            jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+         }
+         if (properties.getAbsoluteExpiryTime() != null) {
+            jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+         }
+      }
+
+      // If the jms expiration has not yet been set...
+      if (jms.getJMSExpiration() == 0) {
+         // Then lets try to set it based on the message ttl.
+         long ttl = defaultTtl;
+         if (header.getTtl() != null) {
+            ttl = header.getTtl().longValue();
+         }
+
+         if (ttl == 0) {
+            jms.setJMSExpiration(0);
+         }
+         else {
+            jms.setJMSExpiration(System.currentTimeMillis() + ttl);
+         }
+      }
+
+      final Footer fp = amqp.getFooter();
+      if (fp != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
+         }
+      }
+   }
+
+   private void setProperty(Message msg, String key, Object value) throws JMSException {
+      if (value instanceof UnsignedLong) {
+         long v = ((UnsignedLong) value).longValue();
+         msg.setLongProperty(key, v);
+      }
+      else if (value instanceof UnsignedInteger) {
+         long v = ((UnsignedInteger) value).longValue();
+         if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
+            msg.setIntProperty(key, (int) v);
+         }
+         else {
+            msg.setLongProperty(key, v);
+         }
+      }
+      else if (value instanceof UnsignedShort) {
+         int v = ((UnsignedShort) value).intValue();
+         if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
+            msg.setShortProperty(key, (short) v);
+         }
+         else {
+            msg.setIntProperty(key, v);
+         }
+      }
+      else if (value instanceof UnsignedByte) {
+         short v = ((UnsignedByte) value).shortValue();
+         if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
+            msg.setByteProperty(key, (byte) v);
+         }
+         else {
+            msg.setShortProperty(key, v);
+         }
+      }
+      else if (value instanceof Symbol) {
+         msg.setStringProperty(key, value.toString());
+      }
+      else if (value instanceof Decimal128) {
+         msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
+      }
+      else if (value instanceof Decimal64) {
+         msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
+      }
+      else if (value instanceof Decimal32) {
+         msg.setFloatProperty(key, ((Decimal32) value).floatValue());
+      }
+      else if (value instanceof Binary) {
+         msg.setStringProperty(key, value.toString());
+      }
+      else {
+         msg.setObjectProperty(key, value);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
new file mode 100644
index 0000000..2bcbfe2
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+   public JMSMappingInboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   @Override
+   public String getTransformerName() {
+      return TRANSFORMER_JMS;
+   }
+
+   @Override
+   public InboundTransformer getFallbackTransformer() {
+      return new AMQPNativeInboundTransformer(getVendor());
+   }
+
+   @SuppressWarnings({"unchecked"})
+   @Override
+   public Message transform(EncodedMessage amqpMessage) throws Exception {
+      org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+      Message rc;
+      final Section body = amqp.getBody();
+      if (body == null) {
+         rc = vendor.createMessage();
+      }
+      else if (body instanceof Data) {
+         Binary d = ((Data) body).getValue();
+         BytesMessage m = vendor.createBytesMessage();
+         m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+         rc = m;
+      }
+      else if (body instanceof AmqpSequence) {
+         AmqpSequence sequence = (AmqpSequence) body;
+         StreamMessage m = vendor.createStreamMessage();
+         for (Object item : sequence.getValue()) {
+            m.writeObject(item);
+         }
+         rc = m;
+         m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_SEQUENCE);
+      }
+      else if (body instanceof AmqpValue) {
+         Object value = ((AmqpValue) body).getValue();
+         if (value == null) {
+            rc = vendor.createObjectMessage();
+         }
+         if (value instanceof String) {
+            TextMessage m = vendor.createTextMessage();
+            m.setText((String) value);
+            rc = m;
+         }
+         else if (value instanceof Binary) {
+            Binary d = (Binary) value;
+            BytesMessage m = vendor.createBytesMessage();
+            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+            rc = m;
+         }
+         else if (value instanceof List) {
+            StreamMessage m = vendor.createStreamMessage();
+            for (Object item : (List<Object>) value) {
+               m.writeObject(item);
+            }
+            rc = m;
+            m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_LIST);
+         }
+         else if (value instanceof Map) {
+            MapMessage m = vendor.createMapMessage();
+            final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
+            for (Map.Entry<String, Object> entry : set) {
+               m.setObject(entry.getKey(), entry.getValue());
+            }
+            rc = m;
+         }
+         else {
+            ObjectMessage m = vendor.createObjectMessage();
+            m.setObject((Serializable) value);
+            rc = m;
+         }
+      }
+      else {
+         throw new RuntimeException("Unexpected body type: " + body.getClass());
+      }
+      rc.setJMSDeliveryMode(defaultDeliveryMode);
+      rc.setJMSPriority(defaultPriority);
+      rc.setJMSExpiration(defaultTtl);
+
+      populateMessage(rc, amqp);
+
+      rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+      rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+      return rc;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
new file mode 100644
index 0000000..7de9408
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.jboss.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+
+public class JMSMappingOutboundTransformer extends OutboundTransformer {
+   private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class);
+   public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
+   public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
+
+   public static final byte QUEUE_TYPE = 0x00;
+   public static final byte TOPIC_TYPE = 0x01;
+   public static final byte TEMP_QUEUE_TYPE = 0x02;
+   public static final byte TEMP_TOPIC_TYPE = 0x03;
+
+   // Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
+
+   public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
+   public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
+
+   public static final String LEGACY_QUEUE_TYPE = "queue";
+   public static final String LEGACY_TOPIC_TYPE = "topic";
+   public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
+   public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
+
+   public JMSMappingOutboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   /**
+    * Perform the conversion between JMS Message and Proton Message without
+    * re-encoding it to array. This is needed because some frameworks may elect
+    * to do this on their own way (Netty for instance using Nettybuffers)
+    *
+    * @param msg
+    * @return
+    * @throws Exception
+    */
+   public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
+      Header header = new Header();
+      Properties props = new Properties();
+      HashMap<Symbol, Object> daMap = null;
+      HashMap<Symbol, Object> maMap = null;
+      HashMap apMap = null;
+      Section body = null;
+      HashMap footerMap = null;
+      if (msg instanceof BytesMessage) {
+         BytesMessage m = (BytesMessage) msg;
+         byte[] data = new byte[(int) m.getBodyLength()];
+         m.readBytes(data);
+         m.reset(); // Need to reset after readBytes or future readBytes
+         // calls (ex: redeliveries) will fail and return -1
+         body = new Data(new Binary(data));
+      }
+      if (msg instanceof TextMessage) {
+         body = new AmqpValue(((TextMessage) msg).getText());
+      }
+      if (msg instanceof MapMessage) {
+         final HashMap<String, Object> map = new HashMap<>();
+         final MapMessage m = (MapMessage) msg;
+         final Enumeration<String> names = m.getMapNames();
+         while (names.hasMoreElements()) {
+            String key = names.nextElement();
+            map.put(key, m.getObject(key));
+         }
+         body = new AmqpValue(map);
+      }
+      if (msg instanceof StreamMessage) {
+         ArrayList<Object> list = new ArrayList<>();
+         final StreamMessage m = (StreamMessage) msg;
+         try {
+            while (true) {
+               list.add(m.readObject());
+            }
+         }
+         catch (MessageEOFException e) {
+         }
+
+         String amqpType = msg.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY);
+         if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) {
+            body = new AmqpValue(list);
+         }
+         else {
+            body = new AmqpSequence(list);
+         }
+      }
+      if (msg instanceof ObjectMessage) {
+         body = new AmqpValue(((ObjectMessage) msg).getObject());
+      }
+
+      if (body == null && msg instanceof ServerJMSMessage) {
+
+         MessageInternal internalMessage = ((ServerJMSMessage) msg).getInnerMessage();
+         if (!internalMessage.containsProperty("AMQP_MESSAGE_FORMAT")) {
+            int readerIndex = internalMessage.getBodyBuffer().readerIndex();
+            try {
+               Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
+               if (s != null) {
+                  body = new AmqpValue(s.toString());
+               }
+            }
+            catch (Throwable ignored) {
+               logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored);
+            }
+            finally {
+               internalMessage.getBodyBuffer().readerIndex(readerIndex);
+            }
+         }
+      }
+
+      header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
+      header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
+      if (msg.getJMSType() != null) {
+         props.setSubject(msg.getJMSType());
+      }
+      if (msg.getJMSMessageID() != null) {
+
+         String msgId = msg.getJMSMessageID();
+
+         try {
+            props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId));
+         }
+         catch (ActiveMQAMQPIllegalStateException e) {
+            props.setMessageId(msgId);
+         }
+      }
+      if (msg.getJMSDestination() != null) {
+         props.setTo(vendor.toAddress(msg.getJMSDestination()));
+         if (maMap == null) {
+            maMap = new HashMap<>();
+         }
+         maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
+
+         // Deprecated: used by legacy QPid AMQP 1.0 JMS client
+         maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
+      }
+      if (msg.getJMSReplyTo() != null) {
+         props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
+         if (maMap == null) {
+            maMap = new HashMap<>();
+         }
+         maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
+
+         // Deprecated: used by legacy QPid AMQP 1.0 JMS client
+         maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
+      }
+      if (msg.getJMSCorrelationID() != null) {
+         String correlationId = msg.getJMSCorrelationID();
+
+         try {
+            props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+         }
+         catch (ActiveMQAMQPIllegalStateException e) {
+            props.setCorrelationId(correlationId);
+         }
+      }
+      if (msg.getJMSExpiration() != 0) {
+         long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
+         if (ttl < 0) {
+            ttl = 1;
+         }
+         header.setTtl(new UnsignedInteger((int) ttl));
+
+         props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
+      }
+      if (msg.getJMSTimestamp() != 0) {
+         props.setCreationTime(new Date(msg.getJMSTimestamp()));
+      }
+
+      final Enumeration<String> keys = msg.getPropertyNames();
+      while (keys.hasMoreElements()) {
+         String key = keys.nextElement();
+         if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) {
+            // skip..
+         }
+         else if (key.equals(firstAcquirerKey)) {
+            header.setFirstAcquirer(msg.getBooleanProperty(key));
+         }
+         else if (key.startsWith("JMSXDeliveryCount")) {
+            // The AMQP delivery-count field only includes prior failed delivery attempts,
+            // whereas JMSXDeliveryCount includes the first/current delivery attempt.
+            int amqpDeliveryCount = msg.getIntProperty(key) - 1;
+            if (amqpDeliveryCount > 0) {
+               header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
+            }
+         }
+         else if (key.startsWith("JMSXUserID")) {
+            String value = msg.getStringProperty(key);
+            props.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
+         }
+         else if (key.startsWith("JMSXGroupID") || key.startsWith("_AMQ_GROUP_ID")) {
+            String value = msg.getStringProperty(key);
+            props.setGroupId(value);
+            if (apMap == null) {
+               apMap = new HashMap();
+            }
+            apMap.put(key, value);
+         }
+         else if (key.startsWith("JMSXGroupSeq")) {
+            UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
+            props.setGroupSequence(value);
+            if (apMap == null) {
+               apMap = new HashMap();
+            }
+            apMap.put(key, value);
+         }
+         else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
+            if (daMap == null) {
+               daMap = new HashMap<>();
+            }
+            String name = key.substring(prefixDeliveryAnnotationsKey.length());
+            daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+         }
+         else if (key.startsWith(prefixMessageAnnotationsKey)) {
+            if (maMap == null) {
+               maMap = new HashMap<>();
+            }
+            String name = key.substring(prefixMessageAnnotationsKey.length());
+            maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
+         }
+         else if (key.equals(contentTypeKey)) {
+            props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
+         }
+         else if (key.equals(contentEncodingKey)) {
+            props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
+         }
+         else if (key.equals(replyToGroupIDKey)) {
+            props.setReplyToGroupId(msg.getStringProperty(key));
+         }
+         else if (key.startsWith(prefixFooterKey)) {
+            if (footerMap == null) {
+               footerMap = new HashMap();
+            }
+            String name = key.substring(prefixFooterKey.length());
+            footerMap.put(name, msg.getObjectProperty(key));
+         }
+         else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) {
+            // skip
+         }
+         else {
+            if (apMap == null) {
+               apMap = new HashMap();
+            }
+            apMap.put(key, msg.getObjectProperty(key));
+         }
+      }
+
+      MessageAnnotations ma = null;
+      if (maMap != null) {
+         ma = new MessageAnnotations(maMap);
+      }
+      DeliveryAnnotations da = null;
+      if (daMap != null) {
+         da = new DeliveryAnnotations(daMap);
+      }
+      ApplicationProperties ap = null;
+      if (apMap != null) {
+         ap = new ApplicationProperties(apMap);
+      }
+      Footer footer = null;
+      if (footerMap != null) {
+         footer = new Footer(footerMap);
+      }
+
+      return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
+   }
+
+   private static byte destinationType(Destination destination) {
+      if (destination instanceof Queue) {
+         if (destination instanceof TemporaryQueue) {
+            return TEMP_QUEUE_TYPE;
+         }
+         else {
+            return QUEUE_TYPE;
+         }
+      }
+      else if (destination instanceof Topic) {
+         if (destination instanceof TemporaryTopic) {
+            return TEMP_TOPIC_TYPE;
+         }
+         else {
+            return TOPIC_TYPE;
+         }
+      }
+
+      throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+   }
+
+   // Used by legacy QPid AMQP 1.0 JMS client.
+   @Deprecated
+   private static String destinationAttributes(Destination destination) {
+      if (destination instanceof Queue) {
+         if (destination instanceof TemporaryQueue) {
+            return LEGACY_TEMP_QUEUE_TYPE;
+         }
+         else {
+            return LEGACY_QUEUE_TYPE;
+         }
+      }
+      else if (destination instanceof Topic) {
+         if (destination instanceof TemporaryTopic) {
+            return LEGACY_TEMP_TOPIC_TYPE;
+         }
+         else {
+            return LEGACY_TOPIC_TYPE;
+         }
+      }
+
+      throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
new file mode 100644
index 0000000..9a0ed63
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSVendor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+public interface JMSVendor {
+
+   BytesMessage createBytesMessage();
+
+   StreamMessage createStreamMessage();
+
+   Message createMessage();
+
+   TextMessage createTextMessage();
+
+   ObjectMessage createObjectMessage();
+
+   MapMessage createMapMessage();
+
+   void setJMSXUserID(Message message, String value);
+
+   Destination createDestination(String name);
+
+   void setJMSXGroupID(Message message, String groupId);
+
+   void setJMSXGroupSequence(Message message, int value);
+
+   void setJMSXDeliveryCount(Message message, long value);
+
+   String toAddress(Destination destination);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
new file mode 100644
index 0000000..310d4ba
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+public abstract class OutboundTransformer {
+
+   JMSVendor vendor;
+   String prefixVendor;
+
+   String prefixDeliveryAnnotations = "DA_";
+   String prefixMessageAnnotations = "MA_";
+   String prefixFooter = "FT_";
+
+   String messageFormatKey;
+   String nativeKey;
+   String firstAcquirerKey;
+   String prefixDeliveryAnnotationsKey;
+   String prefixMessageAnnotationsKey;
+   String contentTypeKey;
+   String contentEncodingKey;
+   String replyToGroupIDKey;
+   String prefixFooterKey;
+
+   public OutboundTransformer(JMSVendor vendor) {
+      this.vendor = vendor;
+      this.setPrefixVendor("JMS_AMQP_");
+   }
+
+   public String getPrefixVendor() {
+      return prefixVendor;
+   }
+
+   public void setPrefixVendor(String prefixVendor) {
+      this.prefixVendor = prefixVendor;
+
+      messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
+      nativeKey = prefixVendor + "NATIVE";
+      firstAcquirerKey = prefixVendor + "FirstAcquirer";
+      prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
+      prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
+      contentTypeKey = prefixVendor + "ContentType";
+      contentEncodingKey = prefixVendor + "ContentEncoding";
+      replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
+      prefixFooterKey = prefixVendor + prefixFooter;
+
+   }
+
+   public JMSVendor getVendor() {
+      return vendor;
+   }
+
+   public void setVendor(JMSVendor vendor) {
+      this.vendor = vendor;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/package-info.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/package-info.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/package-info.java
new file mode 100644
index 0000000..a2d7889
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package will include classes used to make convertions between Artemis and AMQP.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPException.java
new file mode 100644
index 0000000..1634c89
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.Symbol;
+
+public class ActiveMQAMQPException extends ActiveMQException {
+
+   private static final String ERROR_PREFIX = "amqp:";
+
+   public Symbol getAmqpError() {
+      return amqpError;
+   }
+
+   private final Symbol amqpError;
+
+   public ActiveMQAMQPException(Symbol amqpError, String message, Throwable e, ActiveMQExceptionType t) {
+      super(message, e, t);
+      this.amqpError = amqpError;
+   }
+
+   public ActiveMQAMQPException(Symbol amqpError, String message, ActiveMQExceptionType t) {
+      super(message, t);
+      this.amqpError = amqpError;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPIllegalStateException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPIllegalStateException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPIllegalStateException.java
new file mode 100644
index 0000000..7f0fc52
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPIllegalStateException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPIllegalStateException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPIllegalStateException(String message) {
+      super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.ILLEGAL_STATE);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInternalErrorException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInternalErrorException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInternalErrorException.java
new file mode 100644
index 0000000..0af0bbb
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInternalErrorException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPInternalErrorException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPInternalErrorException(String message, Throwable e) {
+      super(AmqpError.INTERNAL_ERROR, message, e, ActiveMQExceptionType.INTERNAL_ERROR);
+   }
+
+   public ActiveMQAMQPInternalErrorException(String message) {
+      super(AmqpError.INTERNAL_ERROR, message, ActiveMQExceptionType.INTERNAL_ERROR);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidFieldException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidFieldException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidFieldException.java
new file mode 100644
index 0000000..a73811c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPInvalidFieldException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPInvalidFieldException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPInvalidFieldException(String message) {
+      super(AmqpError.INVALID_FIELD, message, ActiveMQExceptionType.ILLEGAL_STATE);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotFoundException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotFoundException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotFoundException.java
new file mode 100644
index 0000000..f9acae3
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotFoundException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPNotFoundException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPNotFoundException(String message) {
+      super(AmqpError.NOT_FOUND, message, ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotImplementedException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotImplementedException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotImplementedException.java
new file mode 100644
index 0000000..d2b09c7
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPNotImplementedException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPNotImplementedException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPNotImplementedException(String message) {
+      super(AmqpError.NOT_IMPLEMENTED, message, ActiveMQExceptionType.NOT_IMPLEMTNED_EXCEPTION);
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPResourceLimitExceededException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPResourceLimitExceededException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPResourceLimitExceededException.java
new file mode 100644
index 0000000..978ffb4
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPResourceLimitExceededException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPResourceLimitExceededException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPResourceLimitExceededException(String message) {
+      super(AmqpError.RESOURCE_LIMIT_EXCEEDED, message, ActiveMQExceptionType.ADDRESS_FULL);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPTimeoutException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPTimeoutException.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPTimeoutException.java
new file mode 100644
index 0000000..992f34b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/exceptions/ActiveMQAMQPTimeoutException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPTimeoutException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPTimeoutException(String message) {
+      super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.CONNECTION_TIMEDOUT);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
new file mode 100644
index 0000000..2bfe5fc
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.logger;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidFieldException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageBundle;
+import org.jboss.logging.Messages;
+
+/**
+ * Logger Code 11
+ * <p>
+ * Each message id must be 6 digits long starting with 10, the 3rd digit should be 9. So the range
+ * is from 219000 to 119999.
+ * <p>
+ * Once released, methods should not be deleted as they may be referenced by knowledge base
+ * articles. Unused methods should be marked as deprecated.
+ */
+@MessageBundle(projectCode = "AMQ")
+public interface ActiveMQAMQPProtocolMessageBundle {
+
+   ActiveMQAMQPProtocolMessageBundle BUNDLE = Messages.getBundle(ActiveMQAMQPProtocolMessageBundle.class);
+
+   @Message(id = 219000, value = "target address not set")
+   ActiveMQAMQPInvalidFieldException targetAddressNotSet();
+
+   @Message(id = 219001, value = "error creating temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPInternalErrorException errorCreatingTemporaryQueue(String message);
+
+   @Message(id = 219002, value = "target address does not exist")
+   ActiveMQAMQPNotFoundException addressDoesntExist();
+
+   @Message(id = 219003, value = "error finding temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPNotFoundException errorFindingTemporaryQueue(String message);
+
+   @Message(id = 219005, value = "error creating consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPInternalErrorException errorCreatingConsumer(String message);
+
+   @Message(id = 219006, value = "error starting consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPIllegalStateException errorStartingConsumer(String message);
+
+   @Message(id = 219007, value = "error acknowledging message {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPIllegalStateException errorAcknowledgingMessage(String messageID, String message);
+
+   @Message(id = 219008, value = "error cancelling message {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPIllegalStateException errorCancellingMessage(String messageID, String message);
+
+   @Message(id = 219010, value = "source address does not exist")
+   ActiveMQAMQPNotFoundException sourceAddressDoesntExist();
+
+   @Message(id = 219011, value = "source address not set")
+   ActiveMQAMQPInvalidFieldException sourceAddressNotSet();
+
+   @Message(id = 219012, value = "error rolling back coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPIllegalStateException errorRollingbackCoordinator(String message);
+
+   @Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPIllegalStateException errorCommittingCoordinator(String message);
+
+   @Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
new file mode 100644
index 0000000..8b14e67
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.VersionLoader;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+import org.jboss.logging.Logger;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class AMQPConnectionContext extends ProtonInitializable  {
+
+   private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
+
+   public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+   public static final String AMQP_CONTAINER_ID = "amqp-container-id";
+
+   protected final ProtonHandler handler;
+
+   protected AMQPConnectionCallback connectionCallback;
+   private final String containerId;
+   private final Map<Symbol, Object> connectionProperties = new HashMap<>();
+   private final ScheduledExecutorService scheduledPool;
+
+   private final Map<Session, AMQPSessionContext> sessions = new ConcurrentHashMap<>();
+
+   protected LocalListener listener = new LocalListener();
+
+
+
+   public AMQPConnectionContext(AMQPConnectionCallback connectionSP,
+                                String containerId,
+                                int idleTimeout,
+                                int maxFrameSize,
+                                int channelMax,
+                                Executor dispatchExecutor,
+                                ScheduledExecutorService scheduledPool) {
+      this.connectionCallback = connectionSP;
+      this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
+
+      connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
+      connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
+
+      this.scheduledPool = scheduledPool;
+      connectionCallback.setConnection(this);
+      this.handler = new ProtonHandler(dispatchExecutor);
+      Transport transport = handler.getTransport();
+      transport.setEmitFlowEventOnSend(false);
+      if (idleTimeout > 0) {
+         transport.setIdleTimeout(idleTimeout);
+      }
+      transport.setChannelMax(channelMax);
+      transport.setMaxFrameSize(maxFrameSize);
+      handler.addEventHandler(listener);
+   }
+
+   protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
+      AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
+      AMQPSessionContext protonSession = new AMQPSessionContext(sessionSPI, this, realSession);
+
+      return protonSession;
+   }
+
+   public SASLResult getSASLResult() {
+      return handler.getSASLResult();
+   }
+
+   public void inputBuffer(ByteBuf buffer) {
+      if (log.isTraceEnabled()) {
+         ByteUtil.debugFrame(log, "Buffer Received ", buffer);
+      }
+
+      handler.inputBuffer(buffer);
+   }
+
+   public void destroy() {
+      connectionCallback.close();
+   }
+
+   public boolean isSyncOnFlush() {
+      return false;
+   }
+
+   public Object getLock() {
+      return handler.getLock();
+   }
+
+   public int capacity() {
+      return handler.capacity();
+   }
+
+   public void outputDone(int bytes) {
+      handler.outputDone(bytes);
+   }
+
+   public void flush() {
+      handler.flush();
+   }
+
+   public void close() {
+      handler.close();
+   }
+
+
+   protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
+      AMQPSessionContext sessionExtension = sessions.get(realSession);
+      if (sessionExtension == null) {
+         // how this is possible? Log a warn here
+         sessionExtension = newSessionExtension(realSession);
+         realSession.setContext(sessionExtension);
+         sessions.put(realSession, sessionExtension);
+      }
+      return sessionExtension;
+   }
+
+
+
+   protected boolean validateConnection(Connection connection) {
+      return connectionCallback.validateConnection(connection, handler.getSASLResult());
+   }
+
+   public boolean checkDataReceived() {
+      return handler.checkDataReceived();
+   }
+
+   public long getCreationTime() {
+      return handler.getCreationTime();
+   }
+
+   protected void flushBytes() {
+      ByteBuf bytes;
+      // handler.outputBuffer has the lock
+      while ((bytes = handler.outputBuffer()) != null) {
+         connectionCallback.onTransport(bytes, this);
+      }
+   }
+
+   public String getRemoteContainer() {
+      return handler.getConnection().getRemoteContainer();
+   }
+
+   public String getPubSubPrefix() {
+      return null;
+   }
+
+   protected void initInternal() throws Exception {
+   }
+
+   protected void remoteLinkOpened(Link link) throws Exception {
+
+      AMQPSessionContext protonSession = (AMQPSessionContext) getSessionExtension(link.getSession());
+
+      link.setSource(link.getRemoteSource());
+      link.setTarget(link.getRemoteTarget());
+      if (link instanceof Receiver) {
+         Receiver receiver = (Receiver) link;
+         if (link.getRemoteTarget() instanceof Coordinator) {
+            Coordinator coordinator = (Coordinator) link.getRemoteTarget();
+            protonSession.addTransactionHandler(coordinator, receiver);
+         }
+         else {
+            protonSession.addReceiver(receiver);
+         }
+      }
+      else {
+         Sender sender = (Sender) link;
+         protonSession.addSender(sender);
+         sender.offer(1);
+      }
+   }
+
+   public Symbol[] getConnectionCapabilitiesOffered() {
+      return ExtCapability.getCapabilities();
+   }
+
+
+
+   // This listener will perform a bunch of things here
+   class LocalListener implements EventHandler {
+
+      @Override
+      public void onInit(Connection connection) throws Exception {
+
+      }
+
+      @Override
+      public void onLocalOpen(Connection connection) throws Exception {
+
+      }
+
+      @Override
+      public void onLocalClose(Connection connection) throws Exception {
+
+      }
+
+      @Override
+      public void onFinal(Connection connection) throws Exception {
+
+      }
+
+      @Override
+      public void onInit(Session session) throws Exception {
+
+      }
+
+      @Override
+      public void onFinal(Session session) throws Exception {
+
+      }
+
+      @Override
+      public void onInit(Link link) throws Exception {
+
+      }
+
+      @Override
+      public void onLocalOpen(Link link) throws Exception {
+
+      }
+
+      @Override
+      public void onLocalClose(Link link) throws Exception {
+
+      }
+
+      @Override
+      public void onFinal(Link link) throws Exception {
+
+      }
+
+      @Override
+      public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
+         if (sasl) {
+            handler.createServerSASL(connectionCallback.getSASLMechnisms());
+         }
+         else {
+            if (!connectionCallback.isSupportsAnonymous()) {
+               connectionCallback.sendSASLSupported();
+               connectionCallback.close();
+               handler.close();
+            }
+         }
+      }
+
+      @Override
+      public void onTransport(Transport transport) {
+         flushBytes();
+      }
+
+      @Override
+      public void onRemoteOpen(Connection connection) throws Exception {
+         synchronized (getLock()) {
+            try {
+               initInternal();
+            }
+            catch (Exception e) {
+               log.error("Error init connection", e);
+            }
+            if (!validateConnection(connection)) {
+               connection.close();
+            }
+            else {
+               connection.setContext(AMQPConnectionContext.this);
+               connection.setContainer(containerId);
+               connection.setProperties(connectionProperties);
+               connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+               connection.open();
+            }
+         }
+         initialise();
+
+         /*
+         * This can be null which is in effect an empty map, also we really dont need to check this for in bound connections
+         * but its here in case we add support for outbound connections.
+         * */
+         if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
+            long nextKeepAliveTime = handler.tick(true);
+            flushBytes();
+            if (nextKeepAliveTime > 0 && scheduledPool != null) {
+               scheduledPool.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                     long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+                     flushBytes();
+                     if (rescheduleAt > 0) {
+                        scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
+                     }
+                  }
+               }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+            }
+         }
+      }
+
+      @Override
+      public void onRemoteClose(Connection connection) {
+         synchronized (getLock()) {
+            connection.close();
+            for (AMQPSessionContext protonSession : sessions.values()) {
+               protonSession.close();
+            }
+            sessions.clear();
+         }
+         // We must force write the channel before we actually destroy the connection
+         onTransport(handler.getTransport());
+         destroy();
+      }
+
+      @Override
+      public void onLocalOpen(Session session) throws Exception {
+         getSessionExtension(session);
+      }
+
+      @Override
+      public void onRemoteOpen(Session session) throws Exception {
+         getSessionExtension(session).initialise();
+         synchronized (getLock()) {
+            session.open();
+         }
+      }
+
+      @Override
+      public void onLocalClose(Session session) throws Exception {
+      }
+
+      @Override
+      public void onRemoteClose(Session session) throws Exception {
+         synchronized (getLock()) {
+            session.close();
+         }
+
+         AMQPSessionContext sessionContext = (AMQPSessionContext)session.getContext();
+         if (sessionContext != null) {
+            sessionContext.close();
+            sessions.remove(session);
+            session.setContext(null);
+         }
+      }
+
+      @Override
+      public void onRemoteOpen(Link link) throws Exception {
+         remoteLinkOpened(link);
+      }
+
+      @Override
+      public void onFlow(Link link) throws Exception {
+         ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
+      }
+
+      @Override
+      public void onRemoteClose(Link link) throws Exception {
+         link.close();
+         ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
+         if (linkContext != null) {
+            linkContext.close(true);
+         }
+      }
+
+      @Override
+      public void onRemoteDetach(Link link) throws Exception {
+         link.detach();
+      }
+
+      @Override
+      public void onDetach(Link link) throws Exception {
+         Object context = link.getContext();
+         if (context instanceof ProtonServerSenderContext) {
+            ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
+            senderContext.close(false);
+         }
+      }
+
+      @Override
+      public void onDelivery(Delivery delivery) throws Exception {
+         ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
+         if (handler != null) {
+            handler.onMessage(delivery);
+         }
+         else {
+            // TODO: logs
+
+            System.err.println("Handler is null, can't delivery " + delivery);
+         }
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConstants.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConstants.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConstants.java
new file mode 100644
index 0000000..728a87b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConstants.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+/**
+ * Constants derived from the AMQP spec
+ */
+public class AMQPConstants {
+
+   /*
+   * Connection Properties
+   * http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-complete-v1.0.pdf#subsection.2.7.1
+   * */
+   public static class Connection {
+
+      public static final int DEFAULT_IDLE_TIMEOUT = -1;
+
+      public static final int DEFAULT_MAX_FRAME_SIZE = -1;//it should be according to the spec 4294967295l;
+
+      public static final int DEFAULT_CHANNEL_MAX = 65535;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
new file mode 100644
index 0000000..9003d3b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.jboss.logging.Logger;
+
+public class AMQPSessionContext extends ProtonInitializable {
+
+   private static final Logger log = Logger.getLogger(AMQPSessionContext.class);
+   protected final AMQPConnectionContext connection;
+
+   protected final AMQPSessionCallback sessionSPI;
+
+   protected final Session session;
+
+   private long currentTag = 0;
+
+   protected Map<Receiver, ProtonServerReceiverContext> receivers = new HashMap<>();
+
+   protected Map<Sender, ProtonServerSenderContext> senders = new HashMap<>();
+
+   protected boolean closed = false;
+
+   public AMQPSessionContext(AMQPSessionCallback sessionSPI,
+                             AMQPConnectionContext connection,
+                             Session session) {
+      this.connection = connection;
+      this.sessionSPI = sessionSPI;
+      this.session = session;
+   }
+
+   protected Map<Object, ProtonServerSenderContext> serverSenders = new HashMap<>();
+
+   @Override
+   public void initialise() throws Exception {
+      if (!isInitialized()) {
+         super.initialise();
+
+         if (sessionSPI != null) {
+            try {
+               sessionSPI.init(this, connection.getSASLResult());
+            }
+            catch (Exception e) {
+               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+            }
+         }
+      }
+   }
+
+   /**
+    * @param consumer
+    * @param queueName
+    */
+   public void disconnect(Object consumer, String queueName) {
+      ProtonServerSenderContext protonConsumer = senders.remove(consumer);
+      if (protonConsumer != null) {
+         try {
+            protonConsumer.close(false);
+         }
+         catch (ActiveMQAMQPException e) {
+            protonConsumer.getSender().setTarget(null);
+            protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
+         }
+      }
+   }
+
+
+   /**
+    * The consumer object from the broker or the key used to store the sender
+    *
+    * @param message
+    * @param consumer
+    * @param deliveryCount
+    * @return the number of bytes sent
+    */
+   public int serverDelivery(Object message, Object consumer, int deliveryCount) throws Exception {
+      ProtonServerSenderContext protonSender = serverSenders.get(consumer);
+      if (protonSender != null) {
+         return protonSender.deliverMessage(message, deliveryCount);
+      }
+      return 0;
+   }
+
+   public byte[] getTag() {
+      return Long.toHexString(currentTag++).getBytes();
+   }
+
+   public void replaceTag(byte[] tag) {
+      // TODO: do we need to reuse this?
+   }
+
+   public void close() {
+      if (closed) {
+         return;
+      }
+
+      // Making a copy to avoid ConcurrentModificationException during the iteration
+      Set<ProtonServerReceiverContext> receiversCopy = new HashSet<>();
+      receiversCopy.addAll(receivers.values());
+
+      for (ProtonServerReceiverContext protonProducer : receiversCopy) {
+         try {
+            protonProducer.close(false);
+         }
+         catch (Exception e) {
+            log.warn(e.getMessage(), e);
+         }
+      }
+      receivers.clear();
+
+      Set<ProtonServerSenderContext> protonSendersClone = new HashSet<>();
+      protonSendersClone.addAll(senders.values());
+
+      for (ProtonServerSenderContext protonConsumer : protonSendersClone) {
+         try {
+            protonConsumer.close(false);
+         }
+         catch (Exception e) {
+            log.warn(e.getMessage(), e);
+         }
+      }
+      senders.clear();
+      try {
+         if (sessionSPI != null) {
+            sessionSPI.close();
+         }
+      }
+      catch (Exception e) {
+         log.warn(e.getMessage(), e);
+      }
+      closed = true;
+   }
+
+   public void removeReceiver(Receiver receiver) {
+      receivers.remove(receiver);
+   }
+
+   public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
+      ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
+
+      coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"),
+                                  Symbol.getSymbol("amqp:multi-txns-per-ssn"),
+                                  Symbol.getSymbol("amqp:multi-ssns-per-txn"));
+
+      receiver.setContext(transactionHandler);
+      receiver.open();
+      receiver.flow(100);
+   }
+
+   public void addSender(Sender sender) throws Exception {
+      ProtonServerSenderContext protonSender = new ProtonServerSenderContext(connection, sender, this, sessionSPI);
+
+      try {
+         protonSender.initialise();
+         senders.put(sender, protonSender);
+         serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
+         sender.setContext(protonSender);
+         sender.open();
+         protonSender.start();
+      }
+      catch (ActiveMQAMQPException e) {
+         senders.remove(sender);
+         sender.setSource(null);
+         sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
+         sender.close();
+      }
+   }
+
+   public void removeSender(Sender sender) throws ActiveMQAMQPException {
+      senders.remove(sender);
+      ProtonServerSenderContext senderRemoved =  senders.remove(sender);
+      if (senderRemoved != null) {
+         serverSenders.remove(senderRemoved.getBrokerConsumer());
+      }
+   }
+
+   public void addReceiver(Receiver receiver) throws Exception {
+      try {
+         ProtonServerReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver);
+         protonReceiver.initialise();
+         receivers.put(receiver, protonReceiver);
+         receiver.setContext(protonReceiver);
+         receiver.open();
+      }
+      catch (ActiveMQAMQPException e) {
+         receivers.remove(receiver);
+         receiver.setTarget(null);
+         receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
+         receiver.close();
+      }
+   }
+
+}


Mime
View raw message