activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [12/16] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Sat, 04 Mar 2017 13:15:38 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
new file mode 100644
index 0000000..6cd2d3f
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+
+// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
+public class AMQPMessage extends RefCountMessage {
+
+   final long messageFormat;
+   ByteBuf data;
+   boolean bufferValid;
+   byte type;
+   long messageID;
+   String address;
+   MessageImpl protonMessage;
+   private volatile int memoryEstimate = -1;
+   private long expiration = 0;
+   // this can be used to encode the header again and the rest of the message buffer
+   private int headerEnd = -1;
+   private boolean parsedHeaders = false;
+   private Header _header;
+   private DeliveryAnnotations _deliveryAnnotations;
+   private MessageAnnotations _messageAnnotations;
+   private Properties _properties;
+   private ApplicationProperties applicationProperties;
+
+   public AMQPMessage(long messageFormat, byte[] data) {
+      this.data = Unpooled.wrappedBuffer(data);
+      this.messageFormat = messageFormat;
+      this.bufferValid = true;
+
+   }
+
+   /** for persistence reload */
+   public AMQPMessage(long messageFormat) {
+      this.messageFormat = messageFormat;
+      this.bufferValid = false;
+
+   }
+
+   public AMQPMessage(long messageFormat, Message message) {
+      this.messageFormat = messageFormat;
+      this.protonMessage = (MessageImpl)message;
+
+   }
+
+   public AMQPMessage(Message message) {
+      this(0, message);
+   }
+
+   public MessageImpl getProtonMessage() {
+      if (protonMessage == null) {
+         protonMessage = (MessageImpl) Message.Factory.create();
+
+         if (data != null) {
+            data.readerIndex(0);
+            protonMessage.decode(data.nioBuffer());
+            this._header = protonMessage.getHeader();
+            protonMessage.setHeader(null);
+         }
+      }
+
+      return protonMessage;
+   }
+
+   private void initalizeObjects() {
+      if (protonMessage == null) {
+         if (data == null) {
+            this.headerEnd = -1;
+            _header = new Header();
+            _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+            _properties = new Properties();
+            this.applicationProperties = new ApplicationProperties(new HashMap<>());
+            this.protonMessage = (MessageImpl)Message.Factory.create();
+            this.protonMessage.setApplicationProperties(applicationProperties);
+            this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
+         }
+      }
+   }
+
+   private Map getApplicationPropertiesMap() {
+
+      ApplicationProperties appMap = getApplicationProperties();
+      Map map = null;
+
+      if (appMap != null) {
+         map = appMap.getValue();
+      }
+
+      if (map == null) {
+         return Collections.emptyMap();
+      } else {
+         return map;
+      }
+   }
+
+   private ApplicationProperties getApplicationProperties() {
+      parseHeaders();
+      return applicationProperties;
+   }
+
+   private void parseHeaders() {
+      if (!parsedHeaders) {
+         if (data == null) {
+            initalizeObjects();
+         } else {
+            partialDecode(data.nioBuffer());
+         }
+         parsedHeaders = true;
+      }
+   }
+
+   public MessageAnnotations getMessageAnnotations() {
+      parseHeaders();
+      return _messageAnnotations;
+   }
+
+   public Header getHeader() {
+      parseHeaders();
+      return _header;
+   }
+
+   public Properties getProperties() {
+      parseHeaders();
+      return _properties;
+   }
+
+   private Object getSymbol(String symbol) {
+      MessageAnnotations annotations = getMessageAnnotations();
+      Map mapAnnotations = annotations != null ? annotations.getValue() : null;
+      if (mapAnnotations != null) {
+         return mapAnnotations.get(Symbol.getSymbol("x-opt-delivery-time"));
+      }
+
+      return null;
+   }
+
+   @Override
+   public Long getScheduledDeliveryTime() {
+
+      Object scheduledTime = getSymbol("x-opt-delivery-time");
+      if (scheduledTime != null && scheduledTime instanceof Number) {
+         return ((Number)scheduledTime).longValue();
+      }
+
+      return null;
+   }
+
+   @Override
+   public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
+      return AMQPMessagePersister.getInstance();
+   }
+
+   private synchronized void partialDecode(ByteBuffer buffer) {
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      decoder.setByteBuffer(buffer);
+      buffer.position(0);
+
+      _header = null;
+      _deliveryAnnotations = null;
+      _messageAnnotations = null;
+      _properties = null;
+      applicationProperties = null;
+      Section section = null;
+
+      try {
+         if (buffer.hasRemaining()) {
+            section = (Section) decoder.readObject();
+         }
+
+         if (section instanceof Header) {
+            headerEnd = buffer.position();
+            _header = (Header) section;
+
+            if (_header.getTtl() != null) {
+               this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
+            }
+
+            if (buffer.hasRemaining()) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+         } else {
+            // meaning there is no header
+            headerEnd = 0;
+         }
+         if (section instanceof DeliveryAnnotations) {
+            _deliveryAnnotations = (DeliveryAnnotations) section;
+
+            if (buffer.hasRemaining()) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+
+         }
+         if (section instanceof MessageAnnotations) {
+            _messageAnnotations = (MessageAnnotations) section;
+
+            if (buffer.hasRemaining()) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+
+         }
+         if (section instanceof Properties) {
+            _properties = (Properties) section;
+
+            if (buffer.hasRemaining()) {
+               section = (Section) decoder.readObject();
+            } else {
+               section = null;
+            }
+
+         }
+
+         if (section instanceof ApplicationProperties) {
+            applicationProperties = (ApplicationProperties) section;
+         }
+      } finally {
+         decoder.setByteBuffer(null);
+      }
+   }
+
+   public long getMessageFormat() {
+      return messageFormat;
+   }
+
+   public int getLength() {
+      return data.array().length;
+   }
+
+   public byte[] getArray() {
+      return data.array();
+   }
+
+   @Override
+   public void messageChanged() {
+      bufferValid = false;
+      this.data = null;
+   }
+
+   @Override
+   public ByteBuf getBuffer() {
+      if (data == null) {
+         return null;
+      } else {
+         return Unpooled.wrappedBuffer(data);
+      }
+   }
+
+   @Override
+   public AMQPMessage setBuffer(ByteBuf buffer) {
+      this.data = null;
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message copy() {
+      checkBuffer();
+      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array());
+      return newEncode;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message copy(long newID) {
+      checkBuffer();
+      return copy().setMessageID(newID);
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageID;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setMessageID(long id) {
+      this.messageID = id;
+      return this;
+   }
+
+   @Override
+   public long getExpiration() {
+      return expiration;
+   }
+
+   @Override
+   public AMQPMessage setExpiration(long expiration) {
+      this.expiration = expiration;
+      return this;
+   }
+
+   @Override
+   public Object getUserID() {
+      return getProperties().getMessageId();
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) {
+      return null;
+   }
+
+   @Override
+   public void copyHeadersAndProperties(org.apache.activemq.artemis.api.core.Message msg) {
+
+   }
+
+   @Override
+   public boolean isDurable() {
+      if (getHeader() != null) {
+         return getHeader().getDurable().booleanValue();
+      } else {
+         return false;
+      }
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
+      return null;
+   }
+
+   @Override
+   public String getAddress() {
+      if (address == null) {
+         Properties properties = getProtonMessage().getProperties();
+         if (properties != null) {
+            return  properties.getTo();
+         } else {
+            return null;
+         }
+      } else {
+         return address;
+      }
+   }
+
+   @Override
+   public AMQPMessage setAddress(String address) {
+      this.address = address;
+      return this;
+   }
+
+   @Override
+   public AMQPMessage setAddress(SimpleString address) {
+      return setAddress(address.toString());
+   }
+
+   @Override
+   public SimpleString getAddressSimpleString() {
+      return SimpleString.toSimpleString(getAddress());
+   }
+
+   @Override
+   public long getTimestamp() {
+      return 0;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
+      return null;
+   }
+
+   @Override
+   public byte getPriority() {
+      return 0;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) {
+      return null;
+   }
+
+   @Override
+   public void receiveBuffer(ByteBuf buffer) {
+
+   }
+
+   private synchronized void checkBuffer() {
+      if (!bufferValid) {
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
+         try {
+            getProtonMessage().encode(new NettyWritable(buffer));
+            byte[] bytes = new byte[buffer.writerIndex()];
+            buffer.readBytes(bytes);
+            this.data = Unpooled.wrappedBuffer(bytes);
+         } finally {
+            buffer.release();
+         }
+      }
+   }
+
+   @Override
+   public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+      checkBuffer();
+      Header header = getHeader();
+      if (header != null) {
+         synchronized (header) {
+            if (header.getDeliveryCount() != null) {
+               header.setDeliveryCount(UnsignedInteger.valueOf(header.getDeliveryCount().intValue() + 1));
+            } else {
+               header.setDeliveryCount(UnsignedInteger.valueOf(1));
+            }
+            TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
+            TLSEncode.getEncoder().writeObject(header);
+         }
+      }
+      buffer.writeBytes(data, headerEnd, data.writerIndex() - headerEnd);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
+      getApplicationPropertiesMap().put(key, Boolean.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
+      getApplicationPropertiesMap().put(key, Byte.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
+      getApplicationPropertiesMap().put(key, value);
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
+      getApplicationPropertiesMap().put(key, Short.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
+      getApplicationPropertiesMap().put(key, Character.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
+      getApplicationPropertiesMap().put(key, Integer.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
+      getApplicationPropertiesMap().put(key, Long.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
+      getApplicationPropertiesMap().put(key, Float.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
+      getApplicationPropertiesMap().put(key, Double.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
+      getApplicationPropertiesMap().put(key, Boolean.valueOf(value));
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) {
+      return putByteProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) {
+      return putBytesProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) {
+      return putShortProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) {
+      return putCharProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) {
+      return putIntProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) {
+      return putLongProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) {
+      return putFloatProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) {
+      return putDoubleProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
+      getApplicationPropertiesMap().put(key, value);
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key,
+                                                                         Object value) throws ActiveMQPropertyConversionException {
+      getApplicationPropertiesMap().put(key, value);
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key,
+                                                                         Object value) throws ActiveMQPropertyConversionException {
+      return putObjectProperty(key.toString(), value);
+   }
+
+   @Override
+   public Object removeProperty(String key) {
+      return getApplicationPropertiesMap().remove(key);
+   }
+
+   @Override
+   public boolean containsProperty(String key) {
+      return getApplicationPropertiesMap().containsKey(key);
+   }
+
+   @Override
+   public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
+      return (Boolean)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
+      return (Byte)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
+      return (Double)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
+      return (Integer)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
+      return (Long)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Object getObjectProperty(String key) {
+      if (key.equals("JMSType")) {
+         return getProperties().getSubject();
+      }
+
+      return getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
+      return (Short)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
+      return (Float)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
+      if (key.equals("JMSType")) {
+         return getProperties().getSubject();
+      }
+      return (String)getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+      return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key));
+   }
+
+   @Override
+   public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+      return (byte[]) getApplicationPropertiesMap().get(key);
+   }
+
+   @Override
+   public Object removeProperty(SimpleString key) {
+      return removeProperty(key.toString());
+   }
+
+   @Override
+   public boolean containsProperty(SimpleString key) {
+      return containsProperty(key.toString());
+   }
+
+   @Override
+   public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getBooleanProperty(key.toString());
+   }
+
+   @Override
+   public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getByteProperty(key.toString());
+   }
+
+   @Override
+   public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getDoubleProperty(key.toString());
+   }
+
+   @Override
+   public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getIntProperty(key.toString());
+   }
+
+   @Override
+   public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getLongProperty(key.toString());
+   }
+
+   @Override
+   public Object getObjectProperty(SimpleString key) {
+      return getObjectProperty(key.toString());
+   }
+
+   @Override
+   public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getShortProperty(key.toString());
+   }
+
+   @Override
+   public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getFloatProperty(key.toString());
+   }
+
+   @Override
+   public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getStringProperty(key.toString());
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getSimpleStringProperty(key.toString());
+   }
+
+   @Override
+   public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getBytesProperty(key.toString());
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) {
+      return putStringProperty(key.toString(), value.toString());
+   }
+
+   @Override
+   public int getEncodeSize() {
+      return 0;
+   }
+
+   @Override
+   public Set<SimpleString> getPropertyNames() {
+      HashSet<SimpleString> values = new HashSet<>();
+      for (Object k : getApplicationPropertiesMap().keySet()) {
+         values.add(SimpleString.toSimpleString(k.toString()));
+      }
+      return values;
+   }
+
+   @Override
+   public int getMemoryEstimate() {
+      if (memoryEstimate == -1) {
+         memoryEstimate = memoryOffset +
+            (data != null ? data.capacity() : 0);
+      }
+
+      return memoryEstimate;
+   }
+
+   @Override
+   public ICoreMessage toCore() {
+      try {
+         return AMQPConverter.getInstance().toCore(this);
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public int getPersistSize() {
+      checkBuffer();
+      return data.array().length + DataConstants.SIZE_INT;
+   }
+
+   @Override
+   public void persist(ActiveMQBuffer targetRecord) {
+      checkBuffer();
+      targetRecord.writeInt(data.array().length);
+      targetRecord.writeBytes(data.array());
+   }
+
+   @Override
+   public void reloadPersistence(ActiveMQBuffer record) {
+      int size = record.readInt();
+      byte[] recordArray = new byte[size];
+      record.readBytes(recordArray);
+      this.data = Unpooled.wrappedBuffer(recordArray);
+      this.bufferValid = true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
new file mode 100644
index 0000000..3b5bdda
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class AMQPMessagePersister extends MessagePersister {
+
+   public static AMQPMessagePersister theInstance = new AMQPMessagePersister();
+
+   public static AMQPMessagePersister getInstance() {
+      return theInstance;
+   }
+
+   private AMQPMessagePersister() {
+   }
+
+   @Override
+   protected byte getID() {
+      return ProtonProtocolManagerFactory.ID;
+   }
+
+   @Override
+   public int getEncodeSize(Message record) {
+      return DataConstants.SIZE_BYTE + record.getPersistSize() +
+         SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
+   }
+
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      super.encode(buffer, record);
+      AMQPMessage msgEncode = (AMQPMessage)record;
+      buffer.writeLong(record.getMessageID());
+      buffer.writeLong(msgEncode.getMessageFormat());
+      buffer.writeNullableSimpleString(record.getAddressSimpleString());
+      record.persist(buffer);
+   }
+
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message record) {
+      long id = buffer.readLong();
+      long format = buffer.readLong();
+      SimpleString address = buffer.readNullableSimpleString();
+      record = new AMQPMessage(format);
+      record.reloadPersistence(buffer);
+      record.setMessageID(id);
+      if (address != null) {
+         record.setAddress(address);
+      }
+      return record;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 18c6b05..5931afe 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -32,16 +34,13 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -65,11 +64,9 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Receiver;
-import io.netty.buffer.ByteBuf;
 import org.jboss.logging.Logger;
 
 public class AMQPSessionCallback implements SessionCallback {
@@ -298,13 +295,6 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public long encodeMessage(Object message, int deliveryCount, WritableBuffer buffer) throws Exception {
-      ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter();
-
-      // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode.
-      return (long) converter.outbound((ServerMessage) message, deliveryCount, buffer);
-   }
-
    public String tempQueueName() {
       return UUIDGenerator.getInstance().generateStringUUID();
    }
@@ -321,22 +311,22 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
+   public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
       if (transaction == null) {
          transaction = serverSession.getCurrentTransaction();
       }
       recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
+         ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
       } finally {
          resetContext();
       }
    }
 
-   public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
+   public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
       recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
+         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
       } finally {
          resetContext();
       }
@@ -351,11 +341,8 @@ public class AMQPSessionCallback implements SessionCallback {
                           final Delivery delivery,
                           String address,
                           int messageFormat,
-                          ByteBuf messageEncoded) throws Exception {
-      EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
-
-      ServerMessage message = manager.getConverter().inbound(encodedMessage);
-      //use the address on the receiver if not null, if null let's hope it was set correctly on the message
+                          byte[] data) throws Exception {
+      AMQPMessage message = new AMQPMessage(messageFormat, data);
       if (address != null) {
          message.setAddress(new SimpleString(address));
       } else {
@@ -372,7 +359,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
       recoverContext();
 
-      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
+      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
       if (store.isRejectingMessages()) {
          // We drop pre-settled messages (and abort any associated Tx)
          if (delivery.remotelySettled()) {
@@ -401,12 +388,12 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    private void serverSend(final Transaction transaction,
-                           final ServerMessage message,
+                           final Message message,
                            final Delivery delivery,
                            final Receiver receiver) throws Exception {
       try {
 
-         message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
+//         message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
          serverSession.send(transaction, message, false, false);
 
          // FIXME Potential race here...
@@ -416,8 +403,8 @@ public class AMQPSessionCallback implements SessionCallback {
                synchronized (connection.getLock()) {
                   delivery.disposition(Accepted.getInstance());
                   delivery.settle();
-                  connection.flush();
                }
+               connection.flush(true);
             }
 
             @Override
@@ -492,14 +479,14 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+   public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
 
       message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
 
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
 
       try {
-         return plugSender.deliverMessage(message, deliveryCount);
+         return plugSender.deliverMessage(CoreAmqpConverter.checkAMQP(message), deliveryCount);
       } catch (Exception e) {
          synchronized (connection.getLock()) {
             plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
@@ -512,7 +499,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    @Override
    public int sendLargeMessage(MessageReference ref,
-                               ServerMessage message,
+                               Message message,
                                ServerConsumer consumer,
                                long bodySize,
                                int deliveryCount) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 754172a..9c7d24d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -26,19 +26,17 @@ import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -54,8 +52,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
 
    private final ActiveMQServer server;
 
-   private MessageConverter protonConverter;
-
    private final ProtonProtocolManagerFactory factory;
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
@@ -72,7 +68,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
-      this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
    }
 
    public ActiveMQServer getServer() {
@@ -80,11 +75,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    }
 
    @Override
-   public MessageConverter getConverter() {
-      return protonConverter;
-   }
-
-   @Override
    public void onNotification(Notification notification) {
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index bef8ef0..98ec228 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -22,6 +22,8 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -32,6 +34,8 @@ import org.osgi.service.component.annotations.Component;
 @Component(service = ProtocolManagerFactory.class)
 public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
 
+   public static final byte ID = 2;
+
    private static final String AMQP_PROTOCOL_NAME = "AMQP";
 
    private static final String MODULE_NAME = "artemis-amqp-protocol";
@@ -39,6 +43,16 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
    private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
 
    @Override
+   public byte getStoreID() {
+      return ID;
+   }
+
+   @Override
+   public Persister<Message> getPersister() {
+      return AMQPMessagePersister.getInstance();
+   }
+
+   @Override
    public ProtocolManager createProtocolManager(ActiveMQServer server,
                                                 final Map<String, Object> parameters,
                                                 List<BaseInterceptor> incomingInterceptors,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java
new file mode 100644
index 0000000..e040138
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPContentTypeSupport.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.StandardCharsets;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.StringTokenizer;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
+
+public final class AMQPContentTypeSupport {
+
+   private static final String UTF_8 = "UTF-8";
+   private static final String CHARSET = "charset";
+   private static final String TEXT = "text";
+   private static final String APPLICATION = "application";
+   private static final String JAVASCRIPT = "javascript";
+   private static final String XML = "xml";
+   private static final String XML_VARIANT = "+xml";
+   private static final String JSON = "json";
+   private static final String JSON_VARIANT = "+json";
+   private static final String XML_DTD = "xml-dtd";
+   private static final String ECMASCRIPT = "ecmascript";
+
+   /**
+    * @param contentType
+    *        the contentType of the received message
+    * @return the character set to use, or null if not to treat the message as text
+    * @throws ActiveMQAMQPInvalidContentTypeException
+    *         if the content-type is invalid in some way.
+    */
+   public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException {
+      if (contentType == null || contentType.trim().isEmpty()) {
+         throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty");
+      }
+
+      int subTypeSeparator = contentType.indexOf("/");
+      if (subTypeSeparator == -1) {
+         throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType);
+      }
+
+      final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
+
+      String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+
+      String parameterPart = null;
+      int parameterSeparator = subTypePart.indexOf(";");
+      if (parameterSeparator != -1) {
+         if (parameterSeparator < subTypePart.length() - 1) {
+            parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
+         }
+         subTypePart = subTypePart.substring(0, parameterSeparator).trim();
+      }
+
+      if (subTypePart.isEmpty()) {
+         throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType);
+      }
+
+      final String subType = subTypePart;
+
+      if (isTextual(type, subType)) {
+         String charset = findCharset(parameterPart);
+         if (charset == null) {
+            charset = UTF_8;
+         }
+
+         if (UTF_8.equals(charset)) {
+            return StandardCharsets.UTF_8;
+         } else {
+            try {
+               return Charset.forName(charset);
+            } catch (IllegalCharsetNameException icne) {
+               throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset);
+            } catch (UnsupportedCharsetException uce) {
+               throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset);
+            }
+         }
+      }
+
+      return null;
+   }
+
+   // ----- Internal Content Type utilities ----------------------------------//
+
+   private static boolean isTextual(String type, String subType) {
+      if (TEXT.equals(type)) {
+         return true;
+      }
+
+      if (APPLICATION.equals(type)) {
+         if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT)
+            || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) {
+            return true;
+         }
+      }
+
+      return false;
+   }
+
+   private static String findCharset(String paramaterPart) {
+      String charset = null;
+
+      if (paramaterPart != null) {
+         StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";");
+         while (tokenizer.hasMoreTokens()) {
+            String parameter = tokenizer.nextToken().trim();
+            int eqIndex = parameter.indexOf('=');
+            if (eqIndex != -1) {
+               String name = parameter.substring(0, eqIndex);
+               if (CHARSET.equalsIgnoreCase(name.trim())) {
+                  String value = unquote(parameter.substring(eqIndex + 1));
+
+                  charset = value.toUpperCase();
+                  break;
+               }
+            }
+         }
+      }
+
+      return charset;
+   }
+
+   private static String unquote(String s) {
+      if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) {
+         return s.substring(1, s.length() - 1);
+      } else {
+         return s;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
new file mode 100644
index 0000000..724474b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+
+
+public class AMQPConverter implements MessageConverter<AMQPMessage> {
+
+   private static final AMQPConverter theInstance = new AMQPConverter();
+
+   private AMQPConverter() {
+   }
+
+   public static AMQPConverter getInstance() {
+      return theInstance;
+   }
+
+   @Override
+   public AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception {
+      return CoreAmqpConverter.fromCore(coreMessage);
+   }
+
+   @Override
+   public ICoreMessage toCore(AMQPMessage messageSource) throws Exception {
+      return AmqpCoreConverter.toCore(messageSource);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
new file mode 100644
index 0000000..00282e0
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
@@ -0,0 +1,252 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id values between the
+ * AMQP types and the Strings values used by JMS.
+ * <p>
+ * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string,
+ * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a
+ * string representation of these for interoperability with other AMQP clients, the following
+ * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID
+ * value:<br>
+ * <p>
+ * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
+ * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
+ * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
+ * {@literal "AMQP_STRING:<string>"}<br>
+ * <p>
+ * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to
+ * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used
+ * otherwise.
+ * <p>
+ * When provided a string for conversion which attempts to identify itself as an encoded binary,
+ * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown.
+ */
+public class AMQPMessageIdHelper {
+
+   public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper();
+
+   public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+   public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+   public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+   public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+
+   private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
+   private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
+   private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
+   private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
+   private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+   /**
+    * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes
+    * type information as a prefix where necessary to convey or escape the type of the provided
+    * object.
+    *
+    * @param messageId
+    *        the raw messageId object to process
+    * @return the base string to be used in creating the actual id.
+    */
+   public String toBaseMessageIdString(Object messageId) {
+      if (messageId == null) {
+         return null;
+      } else if (messageId instanceof String) {
+         String stringId = (String) messageId;
+
+         // If the given string has a type encoding prefix,
+         // we need to escape it as an encoded string (even if
+         // the existing encoding prefix was also for string)
+         if (hasTypeEncodingPrefix(stringId)) {
+            return AMQP_STRING_PREFIX + stringId;
+         } else {
+            return stringId;
+         }
+      } else if (messageId instanceof UUID) {
+         return AMQP_UUID_PREFIX + messageId.toString();
+      } else if (messageId instanceof UnsignedLong) {
+         return AMQP_ULONG_PREFIX + messageId.toString();
+      } else if (messageId instanceof Binary) {
+         ByteBuffer dup = ((Binary) messageId).asByteBuffer();
+
+         byte[] bytes = new byte[dup.remaining()];
+         dup.get(bytes);
+
+         String hex = convertBinaryToHexString(bytes);
+
+         return AMQP_BINARY_PREFIX + hex;
+      } else {
+         throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
+      }
+   }
+
+   /**
+    * Takes the provided base id string and return the appropriate amqp messageId style object.
+    * Converts the type based on any relevant encoding information found as a prefix.
+    *
+    * @param baseId
+    *        the object to be converted to an AMQP MessageId value.
+    * @return the AMQP messageId style object
+    * @throws ActiveMQAMQPIllegalStateException
+    *         if the provided baseId String indicates an encoded type but can't be converted to
+    *         that type.
+    */
+   public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
+      if (baseId == null) {
+         return null;
+      }
+
+      try {
+         if (hasAmqpUuidPrefix(baseId)) {
+            String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
+            return UUID.fromString(uuidString);
+         } else if (hasAmqpUlongPrefix(baseId)) {
+            String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
+            return UnsignedLong.valueOf(longString);
+         } else if (hasAmqpStringPrefix(baseId)) {
+            return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
+         } else if (hasAmqpBinaryPrefix(baseId)) {
+            String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
+            byte[] bytes = convertHexStringToBinary(hexString);
+            return new Binary(bytes);
+         } else {
+            // We have a string without any type prefix, transmit it as-is.
+            return baseId;
+         }
+      } catch (IllegalArgumentException e) {
+         throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
+      }
+   }
+
+   /**
+    * Convert the provided hex-string into a binary representation where each byte represents
+    * two characters of the hex string.
+    * <p>
+    * The hex characters may be upper or lower case.
+    *
+    * @param hexString
+    *        string to convert to a binary value.
+    * @return a byte array containing the binary representation
+    * @throws IllegalArgumentException
+    *         if the provided String is a non-even length or contains non-hex characters
+    */
+   public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
+      int length = hexString.length();
+
+      // As each byte needs two characters in the hex encoding, the string must be an even
+      // length.
+      if (length % 2 != 0) {
+         throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
+      }
+
+      byte[] binary = new byte[length / 2];
+
+      for (int i = 0; i < length; i += 2) {
+         char highBitsChar = hexString.charAt(i);
+         char lowBitsChar = hexString.charAt(i + 1);
+
+         int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+         int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+         binary[i / 2] = (byte) (highBits + lowBits);
+      }
+
+      return binary;
+   }
+
+   /**
+    * Convert the provided binary into a hex-string representation where each character
+    * represents 4 bits of the provided binary, i.e each byte requires two characters.
+    * <p>
+    * The returned hex characters are upper-case.
+    *
+    * @param bytes
+    *        the binary value to convert to a hex String instance.
+    * @return a String containing a hex representation of the bytes
+    */
+   public String convertBinaryToHexString(byte[] bytes) {
+      // Each byte is represented as 2 chars
+      StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+      for (byte b : bytes) {
+         // The byte will be expanded to int before shifting, replicating the
+         // sign bit, so mask everything beyond the first 4 bits afterwards
+         int highBitsInt = (b >> 4) & 0xF;
+         // We only want the first 4 bits
+         int lowBitsInt = b & 0xF;
+
+         builder.append(HEX_CHARS[highBitsInt]);
+         builder.append(HEX_CHARS[lowBitsInt]);
+      }
+
+      return builder.toString();
+   }
+
+   // ----- Internal implementation ------------------------------------------//
+
+   private boolean hasTypeEncodingPrefix(String stringId) {
+      return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
+   }
+
+   private boolean hasAmqpStringPrefix(String stringId) {
+      return stringId.startsWith(AMQP_STRING_PREFIX);
+   }
+
+   private boolean hasAmqpUlongPrefix(String stringId) {
+      return stringId.startsWith(AMQP_ULONG_PREFIX);
+   }
+
+   private boolean hasAmqpUuidPrefix(String stringId) {
+      return stringId.startsWith(AMQP_UUID_PREFIX);
+   }
+
+   private boolean hasAmqpBinaryPrefix(String stringId) {
+      return stringId.startsWith(AMQP_BINARY_PREFIX);
+   }
+
+   private String strip(String id, int numChars) {
+      return id.substring(numChars);
+   }
+
+   private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
+      if (ch >= '0' && ch <= '9') {
+         // subtract '0' to get difference in position as an int
+         return ch - '0';
+      } else if (ch >= 'A' && ch <= 'F') {
+         // subtract 'A' to get difference in position as an int
+         // and then add 10 for the offset of 'A'
+         return ch - 'A' + 10;
+      } else if (ch >= 'a' && ch <= 'f') {
+         // subtract 'a' to get difference in position as an int
+         // and then add 10 for the offset of 'a'
+         return ch - 'a' + 10;
+      }
+
+      throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
new file mode 100644
index 0000000..0a7a049
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.Message;
+
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
+
+/**
+ * Support class containing constant values and static methods that are used to map to / from
+ * AMQP Message types being sent or received.
+ */
+public final class AMQPMessageSupport {
+
+   // Message Properties used to map AMQP to JMS and back
+   /**
+    * Attribute used to mark the class type of JMS message that a particular message
+    * instance represents, used internally by the client.
+    */
+   public static final Symbol JMS_MSG_TYPE = Symbol.getSymbol("x-opt-jms-msg-type");
+
+   /**
+    * Attribute used to mark the Application defined delivery time assigned to the message
+    */
+   public static final String JMS_DELIVERY_TIME = "x-opt-delivery-time";
+
+   /**
+    * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message
+    * which has no body.
+    */
+   public static final byte JMS_MESSAGE = 0;
+
+   /**
+    * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS ObjectMessage
+    * which has an Object value serialized in its message body.
+    */
+   public static final byte JMS_OBJECT_MESSAGE = 1;
+
+   /**
+    * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS MapMessage
+    * which has an Map instance serialized in its message body.
+    */
+   public static final byte JMS_MAP_MESSAGE = 2;
+
+   /**
+    * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS BytesMessage
+    * which has a body that consists of raw bytes.
+    */
+   public static final byte JMS_BYTES_MESSAGE = 3;
+
+   /**
+    * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS StreamMessage
+    * which has a body that is a structured collection of primitives values.
+    */
+   public static final byte JMS_STREAM_MESSAGE = 4;
+
+   /**
+    * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS TextMessage
+    * which has a body that contains a UTF-8 encoded String.
+    */
+   public static final byte JMS_TEXT_MESSAGE = 5;
+
+
+   /**
+    * Content type used to mark Data sections as containing a serialized java object.
+    */
+   public static final Symbol SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = Symbol.getSymbol("application/x-java-serialized-object");
+
+   public static final String JMS_AMQP_PREFIX = "JMS_AMQP_";
+   public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length();
+
+   public static final String NATIVE = "NATIVE";
+   public static final String HEADER = "HEADER";
+   public static final String PROPERTIES = "PROPERTIES";
+
+   public static final String FIRST_ACQUIRER = "FirstAcquirer";
+   public static final String CONTENT_TYPE = "ContentType";
+   public static final String CONTENT_ENCODING = "ContentEncoding";
+   public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
+   public static final String DURABLE = "DURABLE";
+   public static final String PRIORITY = "PRIORITY";
+
+   public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
+   public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
+   public static final String FOOTER_PREFIX = "FT_";
+
+   public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
+   public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE;
+   public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY;
+   public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
+   public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE;
+   public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER;
+   public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE;
+   public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING;
+   public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID;
+   public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
+   public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
+   public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
+
+   // Message body type definitions
+   public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
+   public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
+
+   public static final short AMQP_UNKNOWN = 0;
+   public static final short AMQP_NULL = 1;
+   public static final short AMQP_DATA = 2;
+   public static final short AMQP_SEQUENCE = 3;
+   public static final short AMQP_VALUE_NULL = 4;
+   public static final short AMQP_VALUE_STRING = 5;
+   public static final short AMQP_VALUE_BINARY = 6;
+   public static final short AMQP_VALUE_MAP = 7;
+   public static final short AMQP_VALUE_LIST = 8;
+
+   public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest");
+   public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to");
+
+   public static final byte QUEUE_TYPE = 0x00;
+   public static final byte TOPIC_TYPE = 0x01;
+   public static final byte TEMP_QUEUE_TYPE = 0x02;
+   public static final byte TEMP_TOPIC_TYPE = 0x03;
+
+   /**
+    * Content type used to mark Data sections as containing arbitrary bytes.
+    */
+   public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+
+   /**
+    * Lookup and return the correct Proton Symbol instance based on the given key.
+    *
+    * @param key
+    *        the String value name of the Symbol to locate.
+    *
+    * @return the Symbol value that matches the given key.
+    */
+   public static Symbol getSymbol(String key) {
+      return Symbol.valueOf(key);
+   }
+
+   /**
+    * Safe way to access message annotations which will check internal structure and either
+    * return the annotation if it exists or null if the annotation or any annotations are
+    * present.
+    *
+    * @param key
+    *        the String key to use to lookup an annotation.
+    * @param message
+    *        the AMQP message object that is being examined.
+    *
+    * @return the given annotation value or null if not present in the message.
+    */
+   public static Object getMessageAnnotation(String key, Message message) {
+      if (message != null && message.getMessageAnnotations() != null) {
+         Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+         return annotations.get(AMQPMessageSupport.getSymbol(key));
+      }
+
+      return null;
+   }
+
+   /**
+    * Check whether the content-type field of the properties section (if present) in the given
+    * message matches the provided string (where null matches if there is no content type
+    * present.
+    *
+    * @param contentType
+    *        content type string to compare against, or null if none
+    * @param message
+    *        the AMQP message object that is being examined.
+    *
+    * @return true if content type matches
+    */
+   public static boolean isContentType(String contentType, Message message) {
+      if (contentType == null) {
+         return message.getContentType() == null;
+      } else {
+         return contentType.equals(message.getContentType());
+      }
+   }
+
+   /**
+    * @param contentType
+    *        the contentType of the received message
+    * @return the character set to use, or null if not to treat the message as text
+    */
+   public static Charset getCharsetForTextualContent(String contentType) {
+      try {
+         return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType);
+      } catch (ActiveMQAMQPInvalidContentTypeException e) {
+         return null;
+      }
+   }
+
+   public static String toAddress(Destination destination) {
+      if (destination instanceof ActiveMQDestination) {
+         return ((ActiveMQDestination) destination).getAddress();
+      }
+      return null;
+   }
+
+   public static ServerJMSBytesMessage createBytesMessage(long id) {
+      return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE));
+   }
+
+   public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException {
+      ServerJMSBytesMessage message = createBytesMessage(id);
+      message.writeBytes(array, arrayOffset, length);
+      return message;
+   }
+
+   public static ServerJMSStreamMessage createStreamMessage(long id) {
+      return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE));
+   }
+
+   public static ServerJMSMessage createMessage(long id) {
+      return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE));
+   }
+
+   public static ServerJMSTextMessage createTextMessage(long id) {
+      return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE));
+   }
+
+   public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException {
+      ServerJMSTextMessage message = createTextMessage(id);
+      message.setText(text);
+      return message;
+   }
+
+   public static ServerJMSObjectMessage createObjectMessage(long id) {
+      return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE));
+   }
+
+   public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id);
+      message.setSerializedForm(serializedForm);
+      return message;
+   }
+
+   public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id);
+      message.setSerializedForm(new Binary(array, offset, length));
+      return message;
+   }
+
+   public static ServerJMSMapMessage createMapMessage(long id) {
+      return new ServerJMSMapMessage(newMessage(id, MAP_TYPE));
+   }
+
+   public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException {
+      ServerJMSMapMessage message = createMapMessage(id);
+      final Set<Map.Entry<String, Object>> set = content.entrySet();
+      for (Map.Entry<String, Object> entry : set) {
+         Object value = entry.getValue();
+         if (value instanceof Binary) {
+            Binary binary = (Binary) value;
+            value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength());
+         }
+         message.setObject(entry.getKey(), value);
+      }
+      return message;
+   }
+
+   private static CoreMessage newMessage(long id, byte messageType) {
+      CoreMessage message = new CoreMessage(id, 512);
+      message.setType(messageType);
+//      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
+      return message;
+   }
+}


Mime
View raw message