activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [11/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation
Date Tue, 27 Sep 2016 13:54:38 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
new file mode 100644
index 0000000..7e3ba67
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSVendor;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.utils.IDGenerator;
+
+public class ActiveMQJMSVendor implements JMSVendor {
+
+   private final IDGenerator serverGenerator;
+
+   ActiveMQJMSVendor(IDGenerator idGenerator) {
+      this.serverGenerator = idGenerator;
+   }
+
+   @Override
+   public BytesMessage createBytesMessage() {
+      return new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0);
+   }
+
+   @Override
+   public StreamMessage createStreamMessage() {
+      return new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0);
+   }
+
+   @Override
+   public Message createMessage() {
+      return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0);
+   }
+
+   @Override
+   public TextMessage createTextMessage() {
+      return new ServerJMSTextMessage(newMessage(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE), 0);
+   }
+
+   @Override
+   public ObjectMessage createObjectMessage() {
+      return new ServerJMSObjectMessage(newMessage(org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE), 0);
+   }
+
+   @Override
+   public MapMessage createMapMessage() {
+      return new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0);
+   }
+
+   @Override
+   public void setJMSXUserID(Message message, String s) {
+   }
+
+   @Override
+   public Destination createDestination(String name) {
+      return new ServerDestination(name);
+   }
+
+   @Override
+   public void setJMSXGroupID(Message message, String s) {
+      try {
+         message.setStringProperty("_AMQ_GROUP_ID", s);
+      }
+      catch (JMSException e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   @Override
+   public void setJMSXGroupSequence(Message message, int i) {
+      try {
+         message.setIntProperty("JMSXGroupSeq", i);
+      }
+      catch (JMSException e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   @Override
+   public void setJMSXDeliveryCount(Message message, long l) {
+      try {
+         message.setLongProperty("JMSXDeliveryCount", l);
+      }
+      catch (JMSException e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) {
+      switch (messageType) {
+         case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
+            return new ServerJMSStreamMessage(wrapped, deliveryCount);
+         case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
+            return new ServerJMSBytesMessage(wrapped, deliveryCount);
+         case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
+            return new ServerJMSMapMessage(wrapped, deliveryCount);
+         case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
+            return new ServerJMSTextMessage(wrapped, deliveryCount);
+         case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
+            return new ServerJMSObjectMessage(wrapped, deliveryCount);
+         default:
+            return new ServerJMSMessage(wrapped, deliveryCount);
+      }
+   }
+
+   @Override
+   public String toAddress(Destination destination) {
+      if (destination instanceof ActiveMQDestination) {
+         return ((ActiveMQDestination) destination).getAddress();
+      }
+      return null;
+   }
+
+   private ServerMessageImpl newMessage(byte messageType) {
+      ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512);
+      message.setType(messageType);
+      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
new file mode 100644
index 0000000..f520387
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.utils.IDGenerator;
+
+import javax.jms.BytesMessage;
+import java.io.IOException;
+
+public class ProtonMessageConverter implements MessageConverter {
+
+   ActiveMQJMSVendor activeMQJMSVendor;
+
+   private final String prefixVendor;
+
+   public ProtonMessageConverter(IDGenerator idGenerator) {
+      activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator);
+      inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor);
+      outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor);
+      prefixVendor = outboundTransformer.getPrefixVendor();
+   }
+
+   private final InboundTransformer inboundTransformer;
+   private final JMSMappingOutboundTransformer outboundTransformer;
+
+   @Override
+   public ServerMessage inbound(Object messageSource) throws Exception {
+      ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource);
+
+      return (ServerMessage) jmsMessage.getInnerMessage();
+   }
+
+   /**
+    * Just create the JMS Part of the inbound (for testing)
+    *
+    * @param messageSource
+    * @return
+    * @throws Exception                    https://issues.jboss.org/browse/ENTMQ-1560
+    */
+   public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception {
+      EncodedMessage encodedMessageSource = messageSource;
+      ServerJMSMessage transformedMessage = null;
+
+      InboundTransformer transformer = inboundTransformer;
+
+      while (transformer != null) {
+         try {
+            transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource);
+            break;
+         }
+         catch (Exception e) {
+            ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
+            ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
+
+            transformer = transformer.getFallbackTransformer();
+         }
+      }
+
+      if (transformedMessage == null) {
+         throw new IOException("Failed to transform incoming delivery, skipping.");
+      }
+
+      transformedMessage.encode();
+
+      return transformedMessage;
+   }
+
+   @Override
+   public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
+      ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
+
+      jmsMessage.decode();
+
+      if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) {
+         if (jmsMessage instanceof BytesMessage) {
+            return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage);
+         }
+         else {
+            return null;
+         }
+      }
+      else {
+         return outboundTransformer.convert(jmsMessage);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
new file mode 100644
index 0000000..967ba08
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+/**
+ * This is just here to avoid all the client checks we need with valid JMS destinations, protocol convertors don't need to
+ * adhere to the jms. semantics.
+ */
+public class ServerDestination extends ActiveMQDestination implements Queue {
+   public ServerDestination(String name) {
+      super(name, name, false, false, null);
+   }
+
+   @Override
+   public String getQueueName() throws JMSException {
+      return getName();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
new file mode 100644
index 0000000..abdf808
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadByte;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBytes;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadChar;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadDouble;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadFloat;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadInt;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadLong;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadShort;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUTF;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUnsignedByte;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUnsignedShort;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteBoolean;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteByte;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteBytes;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteChar;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteDouble;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteFloat;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteInt;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteLong;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteObject;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteShort;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF;
+
+public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage {
+
+   public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) {
+      super(message, deliveryCount);
+   }
+
+   @Override
+   public long getBodyLength() throws JMSException {
+      return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET;
+   }
+
+   @Override
+   public boolean readBoolean() throws JMSException {
+      return bytesReadBoolean(getReadBodyBuffer());
+   }
+
+   @Override
+   public byte readByte() throws JMSException {
+      return bytesReadByte(getReadBodyBuffer());
+   }
+
+   @Override
+   public int readUnsignedByte() throws JMSException {
+      return bytesReadUnsignedByte(getReadBodyBuffer());
+   }
+
+   @Override
+   public short readShort() throws JMSException {
+      return bytesReadShort(getReadBodyBuffer());
+   }
+
+   @Override
+   public int readUnsignedShort() throws JMSException {
+      return bytesReadUnsignedShort(getReadBodyBuffer());
+   }
+
+   @Override
+   public char readChar() throws JMSException {
+      return bytesReadChar(getReadBodyBuffer());
+   }
+
+   @Override
+   public int readInt() throws JMSException {
+      return bytesReadInt(getReadBodyBuffer());
+   }
+
+   @Override
+   public long readLong() throws JMSException {
+      return bytesReadLong(getReadBodyBuffer());
+   }
+
+   @Override
+   public float readFloat() throws JMSException {
+      return bytesReadFloat(getReadBodyBuffer());
+   }
+
+   @Override
+   public double readDouble() throws JMSException {
+      return bytesReadDouble(getReadBodyBuffer());
+   }
+
+   @Override
+   public String readUTF() throws JMSException {
+      return bytesReadUTF(getReadBodyBuffer());
+   }
+
+   @Override
+   public int readBytes(byte[] value) throws JMSException {
+      return bytesReadBytes(getReadBodyBuffer(), value);
+   }
+
+   @Override
+   public int readBytes(byte[] value, int length) throws JMSException {
+      return bytesReadBytes(getReadBodyBuffer(), value, length);
+   }
+
+   @Override
+   public void writeBoolean(boolean value) throws JMSException {
+      bytesWriteBoolean(getWriteBodyBuffer(), value);
+
+   }
+
+   @Override
+   public void writeByte(byte value) throws JMSException {
+      bytesWriteByte(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeShort(short value) throws JMSException {
+      bytesWriteShort(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeChar(char value) throws JMSException {
+      bytesWriteChar(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeInt(int value) throws JMSException {
+      bytesWriteInt(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeLong(long value) throws JMSException {
+      bytesWriteLong(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeFloat(float value) throws JMSException {
+      bytesWriteFloat(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeDouble(double value) throws JMSException {
+      bytesWriteDouble(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeUTF(String value) throws JMSException {
+      bytesWriteUTF(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeBytes(byte[] value) throws JMSException {
+      bytesWriteBytes(getWriteBodyBuffer(), value);
+   }
+
+   @Override
+   public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+      bytesWriteBytes(getWriteBodyBuffer(), value, offset, length);
+   }
+
+   @Override
+   public void writeObject(Object value) throws JMSException {
+      if (!bytesWriteObject(getWriteBodyBuffer(), value)) {
+         throw new JMSException("Can't make conversion of " + value + " to any known type");
+      }
+   }
+
+   @Override
+   public void encode() throws Exception {
+      super.encode();
+      // this is to make sure we encode the body-length before it's persisted
+      getBodyLength();
+   }
+
+   @Override
+   public void decode() throws Exception {
+      super.decode();
+
+   }
+
+   @Override
+   public void reset() throws JMSException {
+      bytesMessageReset(getReadBodyBuffer());
+      bytesMessageReset(getWriteBodyBuffer());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
new file mode 100644
index 0000000..548deb3
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageFormatException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.utils.TypedProperties;
+
+import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap;
+import static org.apache.activemq.artemis.reader.MapMessageUtil.writeBodyMap;
+
+/**
+ * ActiveMQ Artemis implementation of a JMS MapMessage.
+ */
+public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage {
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.MAP_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   private final TypedProperties map = new TypedProperties();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /*
+    * This constructor is used to construct messages prior to sending
+    */
+   public ServerJMSMapMessage(MessageInternal message, int deliveryCount) {
+      super(message, deliveryCount);
+
+   }
+
+   // MapMessage implementation -------------------------------------
+
+   @Override
+   public void setBoolean(final String name, final boolean value) throws JMSException {
+      map.putBooleanProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setByte(final String name, final byte value) throws JMSException {
+      map.putByteProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setShort(final String name, final short value) throws JMSException {
+      map.putShortProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setChar(final String name, final char value) throws JMSException {
+      map.putCharProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setInt(final String name, final int value) throws JMSException {
+      map.putIntProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setLong(final String name, final long value) throws JMSException {
+      map.putLongProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setFloat(final String name, final float value) throws JMSException {
+      map.putFloatProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setDouble(final String name, final double value) throws JMSException {
+      map.putDoubleProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setString(final String name, final String value) throws JMSException {
+      map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value));
+   }
+
+   @Override
+   public void setBytes(final String name, final byte[] value) throws JMSException {
+      map.putBytesProperty(new SimpleString(name), value);
+   }
+
+   @Override
+   public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException {
+      if (offset + length > value.length) {
+         throw new JMSException("Invalid offset/length");
+      }
+      byte[] newBytes = new byte[length];
+      System.arraycopy(value, offset, newBytes, 0, length);
+      map.putBytesProperty(new SimpleString(name), newBytes);
+   }
+
+   @Override
+   public void setObject(final String name, final Object value) throws JMSException {
+      try {
+         TypedProperties.setObjectProperty(new SimpleString(name), value, map);
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public boolean getBoolean(final String name) throws JMSException {
+      try {
+         return map.getBooleanProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public byte getByte(final String name) throws JMSException {
+      try {
+         return map.getByteProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public short getShort(final String name) throws JMSException {
+      try {
+         return map.getShortProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public char getChar(final String name) throws JMSException {
+      try {
+         return map.getCharProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public int getInt(final String name) throws JMSException {
+      try {
+         return map.getIntProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public long getLong(final String name) throws JMSException {
+      try {
+         return map.getLongProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public float getFloat(final String name) throws JMSException {
+      try {
+         return map.getFloatProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public double getDouble(final String name) throws JMSException {
+      try {
+         return map.getDoubleProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public String getString(final String name) throws JMSException {
+      try {
+         SimpleString str = map.getSimpleStringProperty(new SimpleString(name));
+         if (str == null) {
+            return null;
+         }
+         else {
+            return str.toString();
+         }
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public byte[] getBytes(final String name) throws JMSException {
+      try {
+         return map.getBytesProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   @Override
+   public Object getObject(final String name) throws JMSException {
+      Object val = map.getProperty(new SimpleString(name));
+
+      if (val instanceof SimpleString) {
+         val = ((SimpleString) val).toString();
+      }
+
+      return val;
+   }
+
+   @Override
+   public Enumeration getMapNames() throws JMSException {
+      Set<SimpleString> simplePropNames = map.getPropertyNames();
+      Set<String> propNames = new HashSet<>(simplePropNames.size());
+
+      for (SimpleString str : simplePropNames) {
+         propNames.add(str.toString());
+      }
+
+      return Collections.enumeration(propNames);
+   }
+
+   @Override
+   public boolean itemExists(final String name) throws JMSException {
+      return map.containsProperty(new SimpleString(name));
+   }
+
+   @Override
+   public void clearBody() throws JMSException {
+      super.clearBody();
+
+      map.clear();
+   }
+
+   @Override
+   public void encode() throws Exception {
+      super.encode();
+      writeBodyMap(getWriteBodyBuffer(), map);
+   }
+
+   @Override
+   public void decode() throws Exception {
+      super.decode();
+      readBodyMap(getReadBodyBuffer(), map);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
new file mode 100644
index 0000000..d15d22b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.Collections;
+import java.util.Enumeration;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.reader.MessageUtil;
+
+public class ServerJMSMessage implements Message {
+
+   public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";
+
+   protected final MessageInternal message;
+
+   protected int deliveryCount;
+
+   public MessageInternal getInnerMessage() {
+      return message;
+   }
+
+   public ServerJMSMessage(MessageInternal message, int deliveryCount) {
+      this.message = message;
+      this.deliveryCount = deliveryCount;
+   }
+
+   private ActiveMQBuffer readBodyBuffer;
+
+   /** When reading we use a protected copy so multi-threads can work fine */
+   protected ActiveMQBuffer getReadBodyBuffer() {
+      if (readBodyBuffer == null) {
+         // to avoid clashes between multiple threads
+         readBodyBuffer = message.getBodyBufferDuplicate();
+      }
+      return readBodyBuffer;
+   }
+
+   /** When writing on the conversion we use the buffer directly */
+   protected ActiveMQBuffer getWriteBodyBuffer() {
+      readBodyBuffer = null; // it invalidates this buffer if anything is written
+      return message.getBodyBuffer();
+   }
+
+
+   @Override
+   public final String getJMSMessageID() throws JMSException {
+      if (message.containsProperty(NATIVE_MESSAGE_ID)) {
+         return getStringProperty(NATIVE_MESSAGE_ID);
+      }
+      return null;
+   }
+
+   @Override
+   public final void setJMSMessageID(String id) throws JMSException {
+      if (id != null) {
+         message.putStringProperty(NATIVE_MESSAGE_ID, id);
+      }
+   }
+
+   @Override
+   public final long getJMSTimestamp() throws JMSException {
+      return message.getTimestamp();
+   }
+
+   @Override
+   public final void setJMSTimestamp(long timestamp) throws JMSException {
+      message.setTimestamp(timestamp);
+   }
+
+   @Override
+   public final byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+      return MessageUtil.getJMSCorrelationIDAsBytes(message);
+   }
+
+   @Override
+   public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException {
+      try {
+         MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
+      }
+      catch (ActiveMQException e) {
+         throw new JMSException(e.getMessage());
+      }
+   }
+
+   @Override
+   public final void setJMSCorrelationID(String correlationID) throws JMSException {
+      MessageUtil.setJMSCorrelationID(message, correlationID);
+   }
+
+   @Override
+   public final String getJMSCorrelationID() throws JMSException {
+      return MessageUtil.getJMSCorrelationID(message);
+   }
+
+   @Override
+   public final Destination getJMSReplyTo() throws JMSException {
+      SimpleString reply = MessageUtil.getJMSReplyTo(message);
+      if (reply != null) {
+         return new ServerDestination(reply.toString());
+      }
+      else {
+         return null;
+      }
+   }
+
+   @Override
+   public final void setJMSReplyTo(Destination replyTo) throws JMSException {
+      MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((ActiveMQDestination) replyTo).getSimpleAddress());
+
+   }
+
+   @Override
+   public final Destination getJMSDestination() throws JMSException {
+      SimpleString sdest = message.getAddress();
+
+      if (sdest == null) {
+         return null;
+      }
+      else {
+         return new ServerDestination(sdest.toString());
+      }
+   }
+
+   @Override
+   public final void setJMSDestination(Destination destination) throws JMSException {
+      if (destination == null) {
+         message.setAddress(null);
+      }
+      else {
+         message.setAddress(((ActiveMQDestination) destination).getSimpleAddress());
+      }
+
+   }
+
+   @Override
+   public final int getJMSDeliveryMode() throws JMSException {
+      return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+   }
+
+   @Override
+   public final void setJMSDeliveryMode(int deliveryMode) throws JMSException {
+      if (deliveryMode == DeliveryMode.PERSISTENT) {
+         message.setDurable(true);
+      }
+      else if (deliveryMode == DeliveryMode.NON_PERSISTENT) {
+         message.setDurable(false);
+      }
+      else {
+         throw new JMSException("Invalid mode " + deliveryMode);
+      }
+   }
+
+   @Override
+   public final boolean getJMSRedelivered() throws JMSException {
+      return false;
+   }
+
+   @Override
+   public final void setJMSRedelivered(boolean redelivered) throws JMSException {
+      // no op
+   }
+
+   @Override
+   public final String getJMSType() throws JMSException {
+      return MessageUtil.getJMSType(message);
+   }
+
+   @Override
+   public final void setJMSType(String type) throws JMSException {
+      MessageUtil.setJMSType(message, type);
+   }
+
+   @Override
+   public final long getJMSExpiration() throws JMSException {
+      return message.getExpiration();
+   }
+
+   @Override
+   public final void setJMSExpiration(long expiration) throws JMSException {
+      message.setExpiration(expiration);
+   }
+
+   @Override
+   public final long getJMSDeliveryTime() throws JMSException {
+      // no op
+      return 0;
+   }
+
+   @Override
+   public final void setJMSDeliveryTime(long deliveryTime) throws JMSException {
+      // no op
+   }
+
+   @Override
+   public final int getJMSPriority() throws JMSException {
+      return message.getPriority();
+   }
+
+   @Override
+   public final void setJMSPriority(int priority) throws JMSException {
+      message.setPriority((byte) priority);
+   }
+
+   @Override
+   public final void clearProperties() throws JMSException {
+      MessageUtil.clearProperties(message);
+
+   }
+
+   @Override
+   public final boolean propertyExists(String name) throws JMSException {
+      return MessageUtil.propertyExists(message, name);
+   }
+
+   @Override
+   public final boolean getBooleanProperty(String name) throws JMSException {
+      return message.getBooleanProperty(name);
+   }
+
+   @Override
+   public final byte getByteProperty(String name) throws JMSException {
+      return message.getByteProperty(name);
+   }
+
+   @Override
+   public final short getShortProperty(String name) throws JMSException {
+      return message.getShortProperty(name);
+   }
+
+   @Override
+   public final int getIntProperty(String name) throws JMSException {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+         return deliveryCount;
+      }
+
+      return message.getIntProperty(name);
+   }
+
+   @Override
+   public final long getLongProperty(String name) throws JMSException {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+         return deliveryCount;
+      }
+
+      return message.getLongProperty(name);
+   }
+
+   @Override
+   public final float getFloatProperty(String name) throws JMSException {
+      return message.getFloatProperty(name);
+   }
+
+   @Override
+   public final double getDoubleProperty(String name) throws JMSException {
+      return message.getDoubleProperty(name);
+   }
+
+   @Override
+   public final String getStringProperty(String name) throws JMSException {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+         return String.valueOf(deliveryCount);
+      }
+
+      return message.getStringProperty(name);
+   }
+
+   @Override
+   public final Object getObjectProperty(String name) throws JMSException {
+      Object val = message.getObjectProperty(name);
+      if (val instanceof SimpleString) {
+         val = ((SimpleString) val).toString();
+      }
+      return val;
+   }
+
+   @Override
+   public final Enumeration getPropertyNames() throws JMSException {
+      return Collections.enumeration(MessageUtil.getPropertyNames(message));
+   }
+
+   @Override
+   public final void setBooleanProperty(String name, boolean value) throws JMSException {
+      message.putBooleanProperty(name, value);
+   }
+
+   @Override
+   public final void setByteProperty(String name, byte value) throws JMSException {
+      message.putByteProperty(name, value);
+   }
+
+   @Override
+   public final void setShortProperty(String name, short value) throws JMSException {
+      message.putShortProperty(name, value);
+   }
+
+   @Override
+   public final void setIntProperty(String name, int value) throws JMSException {
+      message.putIntProperty(name, value);
+   }
+
+   @Override
+   public final void setLongProperty(String name, long value) throws JMSException {
+      message.putLongProperty(name, value);
+   }
+
+   @Override
+   public final void setFloatProperty(String name, float value) throws JMSException {
+      message.putFloatProperty(name, value);
+   }
+
+   @Override
+   public final void setDoubleProperty(String name, double value) throws JMSException {
+      message.putDoubleProperty(name, value);
+   }
+
+   @Override
+   public final void setStringProperty(String name, String value) throws JMSException {
+      message.putStringProperty(name, value);
+   }
+
+   @Override
+   public final void setObjectProperty(String name, Object value) throws JMSException {
+      message.putObjectProperty(name, value);
+   }
+
+   @Override
+   public final void acknowledge() throws JMSException {
+      // no op
+   }
+
+   @Override
+   public void clearBody() throws JMSException {
+      message.getBodyBuffer().clear();
+   }
+
+   @Override
+   public final <T> T getBody(Class<T> c) throws JMSException {
+      // no op.. jms2 not used on the conversion
+      return null;
+   }
+
+   /**
+    * Encode the body into the internal message
+    */
+   public void encode() throws Exception {
+      message.getBodyBuffer().resetReaderIndex();
+   }
+
+   public void decode() throws Exception {
+      message.getBodyBuffer().resetReaderIndex();
+   }
+
+   @Override
+   public final boolean isBodyAssignableTo(Class c) throws JMSException {
+      // no op.. jms2 not used on the conversion
+      return false;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
new file mode 100644
index 0000000..39e0df5
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+public class ServerJMSObjectMessage  extends ServerJMSMessage implements ObjectMessage {
+   private static final String DEFAULT_WHITELIST;
+   private static final String DEFAULT_BLACKLIST;
+
+   static {
+      DEFAULT_WHITELIST = System.getProperty(ObjectInputStreamWithClassLoader.WHITELIST_PROPERTY,
+                 "java.lang,java.math,javax.security,java.util,org.apache.activemq,org.apache.qpid.proton.amqp");
+
+      DEFAULT_BLACKLIST = System.getProperty(ObjectInputStreamWithClassLoader.BLACKLIST_PROPERTY, null);
+   }
+   public static final byte TYPE = Message.STREAM_TYPE;
+
+   private Serializable object;
+
+   public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) {
+      super(message, deliveryCount);
+   }
+
+   @Override
+   public void setObject(Serializable object) throws JMSException {
+      this.object = object;
+   }
+
+   @Override
+   public Serializable getObject() throws JMSException {
+      return object;
+   }
+
+   @Override
+   public void encode() throws Exception {
+      super.encode();
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      ObjectOutputStream ous = new ObjectOutputStream(out);
+      ous.writeObject(object);
+      getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray());
+   }
+
+   @Override
+   public void decode() throws Exception {
+      super.decode();
+      int size = getInnerMessage().getBodyBuffer().readableBytes();
+      byte[] bytes = new byte[size];
+      getInnerMessage().getBodyBuffer().readBytes(bytes);
+      ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes));
+      ois.setWhiteList(DEFAULT_WHITELIST);
+      ois.setBlackList(DEFAULT_BLACKLIST);
+      object = (Serializable) ois.readObject();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
new file mode 100644
index 0000000..c63b701
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadByte;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBytes;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadChar;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadDouble;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadFloat;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadInteger;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadLong;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadObject;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadShort;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadString;
+
+public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage {
+
+   public static final byte TYPE = Message.STREAM_TYPE;
+
+   private int bodyLength = 0;
+
+   public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) {
+      super(message, deliveryCount);
+   }
+
+   // StreamMessage implementation ----------------------------------
+
+   @Override
+   public boolean readBoolean() throws JMSException {
+
+      try {
+         return streamReadBoolean(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public byte readByte() throws JMSException {
+      try {
+         return streamReadByte(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public short readShort() throws JMSException {
+
+      try {
+         return streamReadShort(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public char readChar() throws JMSException {
+
+      try {
+         return streamReadChar(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public int readInt() throws JMSException {
+
+      try {
+         return streamReadInteger(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public long readLong() throws JMSException {
+
+      try {
+         return streamReadLong(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public float readFloat() throws JMSException {
+
+      try {
+         return streamReadFloat(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public double readDouble() throws JMSException {
+
+      try {
+         return streamReadDouble(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public String readString() throws JMSException {
+
+      try {
+         return streamReadString(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   /**
+    * len here is used to control how many more bytes to read
+    */
+   private int len = 0;
+
+   @Override
+   public int readBytes(final byte[] value) throws JMSException {
+
+      try {
+         Pair<Integer, Integer> pairRead = streamReadBytes(getReadBodyBuffer(), len, value);
+
+         len = pairRead.getA();
+         return pairRead.getB();
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public Object readObject() throws JMSException {
+
+      if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) {
+         throw new MessageEOFException("");
+      }
+      try {
+         return streamReadObject(getReadBodyBuffer());
+      }
+      catch (IllegalStateException e) {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e) {
+         throw new MessageEOFException("");
+      }
+   }
+
+   @Override
+   public void writeBoolean(final boolean value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.BOOLEAN);
+      getWriteBodyBuffer().writeBoolean(value);
+   }
+
+   @Override
+   public void writeByte(final byte value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.BYTE);
+      getWriteBodyBuffer().writeByte(value);
+   }
+
+   @Override
+   public void writeShort(final short value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.SHORT);
+      getWriteBodyBuffer().writeShort(value);
+   }
+
+   @Override
+   public void writeChar(final char value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.CHAR);
+      getWriteBodyBuffer().writeShort((short) value);
+   }
+
+   @Override
+   public void writeInt(final int value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.INT);
+      getWriteBodyBuffer().writeInt(value);
+   }
+
+   @Override
+   public void writeLong(final long value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.LONG);
+      getWriteBodyBuffer().writeLong(value);
+   }
+
+   @Override
+   public void writeFloat(final float value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.FLOAT);
+      getWriteBodyBuffer().writeInt(Float.floatToIntBits(value));
+   }
+
+   @Override
+   public void writeDouble(final double value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.DOUBLE);
+      getWriteBodyBuffer().writeLong(Double.doubleToLongBits(value));
+   }
+
+   @Override
+   public void writeString(final String value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.STRING);
+      getWriteBodyBuffer().writeNullableString(value);
+   }
+
+   @Override
+   public void writeBytes(final byte[] value) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+      getWriteBodyBuffer().writeInt(value.length);
+      getWriteBodyBuffer().writeBytes(value);
+   }
+
+   @Override
+   public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
+
+      getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+      getWriteBodyBuffer().writeInt(length);
+      getWriteBodyBuffer().writeBytes(value, offset, length);
+   }
+
+   @Override
+   public void writeObject(final Object value) throws JMSException {
+      if (value instanceof String) {
+         writeString((String) value);
+      }
+      else if (value instanceof Boolean) {
+         writeBoolean((Boolean) value);
+      }
+      else if (value instanceof Byte) {
+         writeByte((Byte) value);
+      }
+      else if (value instanceof Short) {
+         writeShort((Short) value);
+      }
+      else if (value instanceof Integer) {
+         writeInt((Integer) value);
+      }
+      else if (value instanceof Long) {
+         writeLong((Long) value);
+      }
+      else if (value instanceof Float) {
+         writeFloat((Float) value);
+      }
+      else if (value instanceof Double) {
+         writeDouble((Double) value);
+      }
+      else if (value instanceof byte[]) {
+         writeBytes((byte[]) value);
+      }
+      else if (value instanceof Character) {
+         writeChar((Character) value);
+      }
+      else if (value == null) {
+         writeString(null);
+      }
+      else {
+         throw new MessageFormatException("Invalid object type: " + value.getClass());
+      }
+   }
+
+   @Override
+   public void reset() throws JMSException {
+      getWriteBodyBuffer().resetReaderIndex();
+   }
+
+   // ActiveMQRAMessage overrides ----------------------------------------
+
+   @Override
+   public void clearBody() throws JMSException {
+      super.clearBody();
+
+      getWriteBodyBuffer().clear();
+   }
+
+   @Override
+   public void decode() throws Exception {
+      super.decode();
+   }
+
+   /**
+    * Encode the body into the internal message
+    */
+   @Override
+   public void encode() throws Exception {
+      super.encode();
+      bodyLength = message.getEndOfBodyPosition();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
new file mode 100644
index 0000000..5178dc2
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+
+import static org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText;
+import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText;
+
+/**
+ * ActiveMQ Artemis implementation of a JMS TextMessage.
+ * <br>
+ * This class was ported from SpyTextMessage in JBossMQ.
+ */
+public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage {
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.TEXT_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write
+   // methods are more efficient for a SimpleString
+   private SimpleString text;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /*
+    * This constructor is used to construct messages prior to sending
+    */
+   public ServerJMSTextMessage(MessageInternal message, int deliveryCount) {
+      super(message, deliveryCount);
+
+   }
+   // TextMessage implementation ------------------------------------
+
+   @Override
+   public void setText(final String text) throws JMSException {
+      if (text != null) {
+         this.text = new SimpleString(text);
+      }
+      else {
+         this.text = null;
+      }
+
+      writeBodyText(getWriteBodyBuffer(), this.text);
+   }
+
+   @Override
+   public String getText() {
+      if (text != null) {
+         return text.toString();
+      }
+      else {
+         return null;
+      }
+   }
+
+   @Override
+   public void clearBody() throws JMSException {
+      super.clearBody();
+
+      text = null;
+   }
+
+   @Override
+   public void encode() throws Exception {
+      super.encode();
+      writeBodyText(getWriteBodyBuffer(), text);
+   }
+
+   @Override
+   public void decode() throws Exception {
+      super.decode();
+      text = readBodyText(getReadBodyBuffer());
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
new file mode 100644
index 0000000..9e172fa
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id values between
+ * the AMQP types and the Strings values used by JMS.
+ * <p>
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
+ * for interoperability with other AMQP clients, the following encoding can be used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value:<br>
+ * <p>
+ * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
+ * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
+ * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
+ * {@literal "AMQP_STRING:<string>"}<br>
+ * <p>
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
+ * <p>
+ * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will be thrown.
+ */
+public class AMQPMessageIdHelper {
+
+   public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper();
+
+   public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+   public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+   public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+   public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+
+   private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
+   private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
+   private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
+   private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
+   private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+   /**
+    * Takes the provided AMQP messageId style object, and convert it to a base string.
+    * Encodes type information as a prefix where necessary to convey or escape the type
+    * of the provided object.
+    *
+    * @param messageId the raw messageId object to process
+    * @return the base string to be used in creating the actual id.
+    */
+   public String toBaseMessageIdString(Object messageId) {
+      if (messageId == null) {
+         return null;
+      }
+      else if (messageId instanceof String) {
+         String stringId = (String) messageId;
+
+         // If the given string has a type encoding prefix,
+         // we need to escape it as an encoded string (even if
+         // the existing encoding prefix was also for string)
+         if (hasTypeEncodingPrefix(stringId)) {
+            return AMQP_STRING_PREFIX + stringId;
+         }
+         else {
+            return stringId;
+         }
+      }
+      else if (messageId instanceof UUID) {
+         return AMQP_UUID_PREFIX + messageId.toString();
+      }
+      else if (messageId instanceof UnsignedLong) {
+         return AMQP_ULONG_PREFIX + messageId.toString();
+      }
+      else if (messageId instanceof Binary) {
+         ByteBuffer dup = ((Binary) messageId).asByteBuffer();
+
+         byte[] bytes = new byte[dup.remaining()];
+         dup.get(bytes);
+
+         String hex = convertBinaryToHexString(bytes);
+
+         return AMQP_BINARY_PREFIX + hex;
+      }
+      else {
+         throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
+      }
+   }
+
+   /**
+    * Takes the provided base id string and return the appropriate amqp messageId style object.
+    * Converts the type based on any relevant encoding information found as a prefix.
+    *
+    * @param baseId the object to be converted to an AMQP MessageId value.
+    * @return the AMQP messageId style object
+    * @throws ActiveMQAMQPIllegalStateException if the provided baseId String indicates an encoded type but can't be converted to that type.
+    */
+   public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
+      if (baseId == null) {
+         return null;
+      }
+
+      try {
+         if (hasAmqpUuidPrefix(baseId)) {
+            String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
+            return UUID.fromString(uuidString);
+         }
+         else if (hasAmqpUlongPrefix(baseId)) {
+            String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
+            return UnsignedLong.valueOf(longString);
+         }
+         else if (hasAmqpStringPrefix(baseId)) {
+            return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
+         }
+         else if (hasAmqpBinaryPrefix(baseId)) {
+            String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
+            byte[] bytes = convertHexStringToBinary(hexString);
+            return new Binary(bytes);
+         }
+         else {
+            // We have a string without any type prefix, transmit it as-is.
+            return baseId;
+         }
+      }
+      catch (IllegalArgumentException e) {
+         throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
+      }
+   }
+
+   /**
+    * Convert the provided hex-string into a binary representation where each byte represents
+    * two characters of the hex string.
+    * <p>
+    * The hex characters may be upper or lower case.
+    *
+    * @param hexString string to convert to a binary value.
+    * @return a byte array containing the binary representation
+    * @throws IllegalArgumentException if the provided String is a non-even length or contains
+    *                                  non-hex characters
+    */
+   public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
+      int length = hexString.length();
+
+      // As each byte needs two characters in the hex encoding, the string must be an even length.
+      if (length % 2 != 0) {
+         throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
+      }
+
+      byte[] binary = new byte[length / 2];
+
+      for (int i = 0; i < length; i += 2) {
+         char highBitsChar = hexString.charAt(i);
+         char lowBitsChar = hexString.charAt(i + 1);
+
+         int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+         int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+         binary[i / 2] = (byte) (highBits + lowBits);
+      }
+
+      return binary;
+   }
+
+   /**
+    * Convert the provided binary into a hex-string representation where each character
+    * represents 4 bits of the provided binary, i.e each byte requires two characters.
+    * <p>
+    * The returned hex characters are upper-case.
+    *
+    * @param bytes the binary value to convert to a hex String instance.
+    * @return a String containing a hex representation of the bytes
+    */
+   public String convertBinaryToHexString(byte[] bytes) {
+      // Each byte is represented as 2 chars
+      StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+      for (byte b : bytes) {
+         // The byte will be expanded to int before shifting, replicating the
+         // sign bit, so mask everything beyond the first 4 bits afterwards
+         int highBitsInt = (b >> 4) & 0xF;
+         // We only want the first 4 bits
+         int lowBitsInt = b & 0xF;
+
+         builder.append(HEX_CHARS[highBitsInt]);
+         builder.append(HEX_CHARS[lowBitsInt]);
+      }
+
+      return builder.toString();
+   }
+
+   //----- Internal implementation ------------------------------------------//
+
+   private boolean hasTypeEncodingPrefix(String stringId) {
+      return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) ||
+            hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
+   }
+
+   private boolean hasAmqpStringPrefix(String stringId) {
+      return stringId.startsWith(AMQP_STRING_PREFIX);
+   }
+
+   private boolean hasAmqpUlongPrefix(String stringId) {
+      return stringId.startsWith(AMQP_ULONG_PREFIX);
+   }
+
+   private boolean hasAmqpUuidPrefix(String stringId) {
+      return stringId.startsWith(AMQP_UUID_PREFIX);
+   }
+
+   private boolean hasAmqpBinaryPrefix(String stringId) {
+      return stringId.startsWith(AMQP_BINARY_PREFIX);
+   }
+
+   private String strip(String id, int numChars) {
+      return id.substring(numChars);
+   }
+
+   private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
+      if (ch >= '0' && ch <= '9') {
+         // subtract '0' to get difference in position as an int
+         return ch - '0';
+      }
+      else if (ch >= 'A' && ch <= 'F') {
+         // subtract 'A' to get difference in position as an int
+         // and then add 10 for the offset of 'A'
+         return ch - 'A' + 10;
+      }
+      else if (ch >= 'a' && ch <= 'f') {
+         // subtract 'a' to get difference in position as an int
+         // and then add 10 for the offset of 'a'
+         return ch - 'a' + 10;
+      }
+
+      throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
new file mode 100644
index 0000000..613de6d
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+public class AMQPMessageTypes {
+   public static final String AMQP_TYPE_KEY = "amqp:type";
+
+   public static final String AMQP_SEQUENCE = "amqp:sequence";
+
+   public static final String AMQP_LIST = "amqp:list";
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
new file mode 100644
index 0000000..8a5d17c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import javax.jms.Message;
+
+public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
+
+   public AMQPNativeInboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   @Override
+   public String getTransformerName() {
+      return TRANSFORMER_NATIVE;
+   }
+
+   @Override
+   public InboundTransformer getFallbackTransformer() {
+      return new AMQPRawInboundTransformer(getVendor());
+   }
+
+   @Override
+   public Message transform(EncodedMessage amqpMessage) throws Exception {
+      org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+      Message rc = super.transform(amqpMessage);
+
+      populateMessage(rc, amqp);
+      return rc;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
new file mode 100644
index 0000000..67175ab
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+public class AMQPNativeOutboundTransformer extends OutboundTransformer {
+
+   public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   public static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+      byte[] data = new byte[(int) msg.getBodyLength()];
+      msg.readBytes(data);
+      msg.reset();
+      int count = msg.getIntProperty("JMSXDeliveryCount");
+
+      // decode...
+      ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+      int offset = 0;
+      int len = data.length;
+      while (len > 0) {
+         final int decoded = amqp.decode(data, offset, len);
+         assert decoded > 0 : "Make progress decoding the message";
+         offset += decoded;
+         len -= decoded;
+      }
+
+      // Update the DeliveryCount header...
+      // The AMQP delivery-count field only includes prior failed delivery attempts,
+      // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+      if (amqp.getHeader() == null) {
+         amqp.setHeader(new Header());
+      }
+
+      amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+
+      return amqp;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
new file mode 100644
index 0000000..e6bf171
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+
+public class AMQPRawInboundTransformer extends InboundTransformer {
+
+   public AMQPRawInboundTransformer(JMSVendor vendor) {
+      super(vendor);
+   }
+
+   @Override
+   public String getTransformerName() {
+      return TRANSFORMER_RAW;
+   }
+
+   @Override
+   public InboundTransformer getFallbackTransformer() {
+      return null;  // No fallback from full raw transform
+   }
+
+   @Override
+   public Message transform(EncodedMessage amqpMessage) throws Exception {
+      BytesMessage rc = vendor.createBytesMessage();
+      rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+
+      // We cannot decode the message headers to check so err on the side of caution
+      // and mark all messages as persistent.
+      rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+      rc.setJMSPriority(defaultPriority);
+
+      final long now = System.currentTimeMillis();
+      rc.setJMSTimestamp(now);
+      if (defaultTtl > 0) {
+         rc.setJMSExpiration(now + defaultTtl);
+      }
+
+      rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+      rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+
+      return rc;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
new file mode 100644
index 0000000..4a80ea6
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.message.Message;
+
+public class EncodedMessage {
+
+   private final Binary data;
+   final long messageFormat;
+
+   public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
+      this.data = new Binary(data, offset, length);
+      this.messageFormat = messageFormat;
+   }
+
+   public long getMessageFormat() {
+      return messageFormat;
+   }
+
+   public Message decode() throws Exception {
+      Message amqp = Message.Factory.create();
+
+      int offset = getArrayOffset();
+      int len = getLength();
+      while (len > 0) {
+         final int decoded = amqp.decode(getArray(), offset, len);
+         assert decoded > 0 : "Make progress decoding the message";
+         offset += decoded;
+         len -= decoded;
+      }
+
+      return amqp;
+   }
+
+   public int getLength() {
+      return data.getLength();
+   }
+
+   public int getArrayOffset() {
+      return data.getArrayOffset();
+   }
+
+   public byte[] getArray() {
+      return data.getArray();
+   }
+
+   @Override
+   public String toString() {
+      return data.toString();
+   }
+}


Mime
View raw message