activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From michaelpea...@apache.org
Subject [3/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more generic
Date Wed, 17 Jan 2018 08:33:57 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 896a8ed..6e28c0e 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -372,7 +372,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    public void setJMSReplyTo(final Destination dest) throws JMSException {
 
       if (dest == null) {
-         MessageUtil.setJMSReplyTo(message, null);
+         MessageUtil.setJMSReplyTo(message, (String) null);
          replyTo = null;
       } else {
          if (dest instanceof ActiveMQDestination == false) {
@@ -391,7 +391,7 @@ public class ActiveMQMessage implements javax.jms.Message {
          }
          ActiveMQDestination jbd = (ActiveMQDestination) dest;
 
-         MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(prefix + jbd.getAddress()));
+         MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
 
          replyTo = jbd;
       }
@@ -401,14 +401,15 @@ public class ActiveMQMessage implements javax.jms.Message {
    public Destination getJMSDestination() throws JMSException {
       if (dest == null) {
          SimpleString address = message.getAddressSimpleString();
-         String prefix = "";
-         if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
-            prefix = QUEUE_QUALIFIED_PREFIX;
+         if (address == null) {
+            dest = null;
+         } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
+            dest = ActiveMQDestination.createQueue(address);
          } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
-            prefix = TOPIC_QUALIFIED_PREFIX;
+            dest = ActiveMQDestination.createTopic(address);
+         } else {
+            dest = ActiveMQDestination.fromPrefixedName(address.toString());
          }
-
-         dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
       }
 
       return dest;
@@ -513,7 +514,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public boolean getBooleanProperty(final String name) throws JMSException {
       try {
-         return message.getBooleanProperty(new SimpleString(name));
+         return message.getBooleanProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -522,7 +523,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public byte getByteProperty(final String name) throws JMSException {
       try {
-         return message.getByteProperty(new SimpleString(name));
+         return message.getByteProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -531,7 +532,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public short getShortProperty(final String name) throws JMSException {
       try {
-         return message.getShortProperty(new SimpleString(name));
+         return message.getShortProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -544,7 +545,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
 
       try {
-         return message.getIntProperty(new SimpleString(name));
+         return message.getIntProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -557,7 +558,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
 
       try {
-         return message.getLongProperty(new SimpleString(name));
+         return message.getLongProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -566,7 +567,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public float getFloatProperty(final String name) throws JMSException {
       try {
-         return message.getFloatProperty(new SimpleString(name));
+         return message.getFloatProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -575,7 +576,7 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public double getDoubleProperty(final String name) throws JMSException {
       try {
-         return message.getDoubleProperty(new SimpleString(name));
+         return message.getDoubleProperty(name);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -593,7 +594,7 @@ public class ActiveMQMessage implements javax.jms.Message {
          } else if (MessageUtil.JMSXUSERID.equals(name)) {
             return message.getValidatedUserID();
          } else {
-            return message.getStringProperty(new SimpleString(name));
+            return message.getStringProperty(name);
          }
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
@@ -608,7 +609,7 @@ public class ActiveMQMessage implements javax.jms.Message {
 
       Object val = message.getObjectProperty(name);
       if (val instanceof SimpleString) {
-         val = ((SimpleString) val).toString();
+         val = val.toString();
       }
       return val;
    }
@@ -622,43 +623,43 @@ public class ActiveMQMessage implements javax.jms.Message {
    public void setBooleanProperty(final String name, final boolean value) throws JMSException {
       checkProperty(name);
 
-      message.putBooleanProperty(new SimpleString(name), value);
+      message.putBooleanProperty(name, value);
    }
 
    @Override
    public void setByteProperty(final String name, final byte value) throws JMSException {
       checkProperty(name);
-      message.putByteProperty(new SimpleString(name), value);
+      message.putByteProperty(name, value);
    }
 
    @Override
    public void setShortProperty(final String name, final short value) throws JMSException {
       checkProperty(name);
-      message.putShortProperty(new SimpleString(name), value);
+      message.putShortProperty(name, value);
    }
 
    @Override
    public void setIntProperty(final String name, final int value) throws JMSException {
       checkProperty(name);
-      message.putIntProperty(new SimpleString(name), value);
+      message.putIntProperty(name, value);
    }
 
    @Override
    public void setLongProperty(final String name, final long value) throws JMSException {
       checkProperty(name);
-      message.putLongProperty(new SimpleString(name), value);
+      message.putLongProperty(name, value);
    }
 
    @Override
    public void setFloatProperty(final String name, final float value) throws JMSException {
       checkProperty(name);
-      message.putFloatProperty(new SimpleString(name), value);
+      message.putFloatProperty(name, value);
    }
 
    @Override
    public void setDoubleProperty(final String name, final double value) throws JMSException {
       checkProperty(name);
-      message.putDoubleProperty(new SimpleString(name), value);
+      message.putDoubleProperty(name, value);
    }
 
    @Override
@@ -670,7 +671,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       } else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
          return;
       } else {
-         message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value));
+         message.putStringProperty(name, value);
       }
    }
 
@@ -703,7 +704,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
 
       try {
-         message.putObjectProperty(new SimpleString(name), value);
+         message.putObjectProperty(name, value);
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -964,7 +965,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       boolean result = false;
 
       if (jmsPropertyName.equals(name)) {
-         message.putStringProperty(corePropertyName, SimpleString.toSimpleString(value.toString()));
+         message.putStringProperty(corePropertyName, value.toString());
 
          result = true;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
index 2deefa9..ff4ee0f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
 
 import javax.jms.Queue;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+
 /**
  * ActiveMQ Artemis implementation of a JMS Queue.
  * <br>
@@ -34,13 +36,17 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
 
    // Constructors --------------------------------------------------
    public ActiveMQQueue() {
-      this(null);
+      this((SimpleString) null);
    }
 
    public ActiveMQQueue(final String address) {
       super(address, TYPE.QUEUE, null);
    }
 
+   public ActiveMQQueue(final SimpleString address) {
+      super(address, TYPE.QUEUE, null);
+   }
+
    public ActiveMQQueue(final String address, boolean temporary) {
       super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 374a985..cf2ec59 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
             throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
          }
 
-         queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
+         queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName);
 
          if (durability == ConsumerDurability.DURABLE) {
             try {
@@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
                   throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
                }
 
-               queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName));
+               queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName);
 
                QueueQuery subResponse = session.queueQuery(queueName);
 
@@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
          throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
       }
 
-      SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name));
+      SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name);
 
       try {
          QueueQuery response = session.queueQuery(queueName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
index 2762a9c..1c70c5b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java
@@ -73,7 +73,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
 
    // For testing only
    public ActiveMQStreamMessage() {
-      message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500);
+      message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500, null);
    }
 
    // Public --------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
index e22e67b..4dbefec 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
 
 import javax.jms.Topic;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+
 /**
  * ActiveMQ Artemis implementation of a JMS Topic.
  * <br>
@@ -33,13 +35,17 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
 
    // Constructors --------------------------------------------------
    public ActiveMQTopic() {
-      this(null);
+      this((SimpleString) null);
    }
 
    public ActiveMQTopic(final String address) {
       this(address, false);
    }
 
+   public ActiveMQTopic(final SimpleString address) {
+      super(address, TYPE.TOPIC, null);
+   }
+
    public ActiveMQTopic(final String address, boolean temporary) {
       super(address, TYPE.TOPIC, null);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 2bdd88a..cdab412 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
@@ -70,7 +71,7 @@ public class AMQPMessage extends RefCountMessage {
    boolean bufferValid;
    Boolean durable;
    long messageID;
-   String address;
+   SimpleString address;
    MessageImpl protonMessage;
    private volatile int memoryEstimate = -1;
    private long expiration = 0;
@@ -90,6 +91,7 @@ public class AMQPMessage extends RefCountMessage {
    private ApplicationProperties applicationProperties;
    private long scheduledTime = -1;
    private String connectionID;
+   private final CoreMessageObjectPools coreMessageObjectPools;
 
    Set<Object> rejectedConsumers;
 
@@ -98,9 +100,14 @@ public class AMQPMessage extends RefCountMessage {
    private volatile TypedProperties extraProperties;
 
    public AMQPMessage(long messageFormat, byte[] data) {
+      this(messageFormat, data, null);
+   }
+
+   public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
       this.data = Unpooled.wrappedBuffer(data);
       this.messageFormat = messageFormat;
       this.bufferValid = true;
+      this.coreMessageObjectPools = coreMessageObjectPools;
       parseHeaders();
    }
 
@@ -108,12 +115,14 @@ public class AMQPMessage extends RefCountMessage {
    public AMQPMessage(long messageFormat) {
       this.messageFormat = messageFormat;
       this.bufferValid = false;
+      this.coreMessageObjectPools = null;
    }
 
    public AMQPMessage(long messageFormat, Message message) {
       this.messageFormat = messageFormat;
       this.protonMessage = (MessageImpl) message;
       this.bufferValid = false;
+      this.coreMessageObjectPools = null;
    }
 
    public AMQPMessage(Message message) {
@@ -301,7 +310,7 @@ public class AMQPMessage extends RefCountMessage {
       parseHeaders();
 
       if (_properties != null && _properties.getGroupId() != null) {
-         return SimpleString.toSimpleString(_properties.getGroupId());
+         return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
       } else {
          return null;
       }
@@ -588,36 +597,33 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public String getAddress() {
-      if (address == null) {
-         Properties properties = getProtonMessage().getProperties();
-         if (properties != null) {
-            return properties.getTo();
-         } else {
-            return null;
-         }
-      } else {
-         return address;
-      }
+      SimpleString addressSimpleString = getAddressSimpleString();
+      return addressSimpleString == null ? null : addressSimpleString.toString();
    }
 
    @Override
    public AMQPMessage setAddress(String address) {
-      this.address = address;
+      this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
       return this;
    }
 
    @Override
    public AMQPMessage setAddress(SimpleString address) {
-      if (address != null) {
-         return setAddress(address.toString());
-      } else {
-         return setAddress((String) null);
-      }
+      this.address = address;
+      return this;
    }
 
    @Override
    public SimpleString getAddressSimpleString() {
-      return SimpleString.toSimpleString(getAddress());
+      if (address == null) {
+         Properties properties = getProtonMessage().getProperties();
+         if (properties != null) {
+            setAddress(properties.getTo());
+         } else {
+            return null;
+         }
+      }
+      return address;
    }
 
    @Override
@@ -977,7 +983,7 @@ public class AMQPMessage extends RefCountMessage {
       if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties);
       if (_properties != null) {
          if (address != null) {
-            _properties.setTo(address);
+            _properties.setTo(address.toString());
          }
          getProtonMessage().setProperties(this._properties);
       }
@@ -987,7 +993,7 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
-      return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
+      return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool());
    }
 
    @Override
@@ -1066,10 +1072,15 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
+      return putStringProperty(key.toString(), value);
+   }
+
+   @Override
    public Set<SimpleString> getPropertyNames() {
       HashSet<SimpleString> values = new HashSet<>();
       for (Object k : getApplicationPropertiesMap().keySet()) {
-         values.add(SimpleString.toSimpleString(k.toString()));
+         values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
       }
       return values;
    }
@@ -1084,17 +1095,22 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public ICoreMessage toCore() {
+   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
       try {
-         return AMQPConverter.getInstance().toCore(this);
+         return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools);
       } catch (Exception e) {
          throw new RuntimeException(e.getMessage(), e);
       }
    }
 
    @Override
+   public ICoreMessage toCore() {
+      return toCore(null);
+   }
+
+   @Override
    public SimpleString getLastValueProperty() {
-      return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString());
+      return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
    }
 
    @Override
@@ -1155,4 +1171,12 @@ public class AMQPMessage extends RefCountMessage {
          ", address=" + getAddress() +
          "]";
    }
+
+   private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
+      return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
+   }
+
+   private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
+      return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 19348f4..7134d3b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -101,6 +102,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AtomicBoolean draining = new AtomicBoolean(false);
 
+   private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
    private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
 
@@ -210,14 +212,14 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    public Object createSender(ProtonServerSenderContext protonSender,
-                              String queue,
+                              SimpleString queue,
                               String filter,
                               boolean browserOnly) throws Exception {
       long consumerID = consumerIDGenerator.generateID();
 
       filter = SelectorTranslator.convertToActiveMQFilterString(filter);
 
-      ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null);
+      ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null);
 
       // AMQP handles its own flow control for when it's started
       consumer.setStarted(true);
@@ -233,48 +235,48 @@ public class AMQPSessionCallback implements SessionCallback {
       serverConsumer.receiveCredits(-1);
    }
 
-   public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
+   public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception {
+      serverSession.createQueue(queueName, queueName, routingType, null, true, false);
    }
 
-   public void createTemporaryQueue(String address,
-                                    String queueName,
+   public void createTemporaryQueue(SimpleString address,
+                                    SimpleString queueName,
                                     RoutingType routingType,
-                                    String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
+                                    SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, true, false);
    }
 
-   public void createUnsharedDurableQueue(String address,
+   public void createUnsharedDurableQueue(SimpleString address,
                                           RoutingType routingType,
-                                          String queueName,
-                                          String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
+                                          SimpleString queueName,
+                                          SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false);
    }
 
-   public void createSharedDurableQueue(String address,
+   public void createSharedDurableQueue(SimpleString address,
                                         RoutingType routingType,
-                                        String queueName,
-                                        String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
+                                        SimpleString queueName,
+                                        SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
    }
 
-   public void createSharedVolatileQueue(String address,
+   public void createSharedVolatileQueue(SimpleString address,
                                          RoutingType routingType,
-                                         String queueName,
-                                         String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
+                                         SimpleString queueName,
+                                         SimpleString filter) throws Exception {
+      serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
    }
 
-   public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
-      QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+   public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
+      QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName);
 
       if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
          try {
-            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true);
+            serverSession.createQueue(queueName, queueName, routingType, null, false, true, true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+         queueQueryResult = serverSession.executeQueueQuery(queueName);
       }
 
       // if auto-create we will return whatever type was used before
@@ -287,32 +289,31 @@ public class AMQPSessionCallback implements SessionCallback {
 
 
 
-   public boolean bindingQuery(String address, RoutingType routingType) throws Exception {
+   public boolean bindingQuery(SimpleString address, RoutingType routingType) throws Exception {
       BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address);
 
       if (bindingQueryResult != null) {
          return bindingQueryResult.isExists();
       }
 
-      SimpleString simpleAddress = SimpleString.toSimpleString(address);
-      bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+      bindingQueryResult = serverSession.executeBindingQuery(address);
       if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
          try {
-            serverSession.createAddress(simpleAddress, routingType, true);
+            serverSession.createAddress(address, routingType, true);
          } catch (ActiveMQAddressExistsException e) {
             // The address may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+         bindingQueryResult = serverSession.executeBindingQuery(address);
       } else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
-         QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress);
+         QueueQueryResult queueBinding = serverSession.executeQueueQuery(address);
          if (!queueBinding.isExists()) {
             try {
-               serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
+               serverSession.createQueue(address, address, routingType, null, false, true, true);
             } catch (ActiveMQQueueExistsException e) {
                // The queue may have been created by another thread in the mean time.  Catch and do nothing.
             }
          }
-         bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+         bindingQueryResult = serverSession.executeBindingQuery(address);
       }
 
       bindingQueryCache.setResult(address, bindingQueryResult);
@@ -320,7 +321,7 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
 
-   public AddressQueryResult addressQuery(String addressName,
+   public AddressQueryResult addressQuery(SimpleString addressName,
                                           RoutingType routingType,
                                           boolean autoCreate) throws Exception {
 
@@ -329,15 +330,15 @@ public class AMQPSessionCallback implements SessionCallback {
          return addressQueryResult;
       }
 
-      addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+      addressQueryResult = serverSession.executeAddressQuery(addressName);
 
       if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
          try {
-            serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true);
+            serverSession.createAddress(addressName, routingType, true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+         addressQueryResult = serverSession.executeAddressQuery(addressName);
       }
 
       addressQueryCache.setResult(addressName, addressQueryResult);
@@ -438,15 +439,15 @@ public class AMQPSessionCallback implements SessionCallback {
                           final Transaction transaction,
                           final Receiver receiver,
                           final Delivery delivery,
-                          String address,
+                          SimpleString address,
                           int messageFormat,
                           byte[] data) throws Exception {
-      AMQPMessage message = new AMQPMessage(messageFormat, data);
+      AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
       if (address != null) {
-         message.setAddress(new SimpleString(address));
+         message.setAddress(address);
       } else {
          // Anonymous relay must set a To value
-         address = message.getAddress();
+         address = message.getAddressSimpleString();
          if (address == null) {
             rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
             return;
@@ -552,7 +553,7 @@ public class AMQPSessionCallback implements SessionCallback {
       });
    }
 
-   public void offerProducerCredit(final String address,
+   public void offerProducerCredit(final SimpleString address,
                                    final int credits,
                                    final int threshold,
                                    final Receiver receiver) {
@@ -567,7 +568,7 @@ public class AMQPSessionCallback implements SessionCallback {
             connection.flush();
             return;
          }
-         final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
+         final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
          store.checkMemory(new Runnable() {
             @Override
             public void run() {
@@ -587,8 +588,8 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public void deleteQueue(String queueName) throws Exception {
-      manager.getServer().destroyQueue(new SimpleString(queueName));
+   public void deleteQueue(SimpleString queueName) throws Exception {
+      manager.getServer().destroyQueue(queueName);
    }
 
    public void resetContext(OperationContext oldContext) {
@@ -657,7 +658,7 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumer, String queueName) {
+   public void disconnect(ServerConsumer consumer, SimpleString queueName) {
       ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
       connection.lock();
       try {
@@ -703,12 +704,12 @@ public class AMQPSessionCallback implements SessionCallback {
       return serverSession.getAddress(address);
    }
 
-   public void removeTemporaryQueue(String address) throws Exception {
-      serverSession.deleteQueue(SimpleString.toSimpleString(address));
+   public void removeTemporaryQueue(SimpleString address) throws Exception {
+      serverSession.deleteQueue(address);
    }
 
-   public RoutingType getDefaultRoutingType(String address) {
-      return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType();
+   public RoutingType getDefaultRoutingType(SimpleString address) {
+      return manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType();
    }
 
    public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
@@ -733,10 +734,10 @@ public class AMQPSessionCallback implements SessionCallback {
 
 
    class AddressQueryCache<T> {
-      String address;
+      SimpleString address;
       T result;
 
-      public synchronized T getResult(String parameterAddress) {
+      public synchronized T getResult(SimpleString parameterAddress) {
          if (address != null && address.equals(parameterAddress)) {
             return result;
          } else {
@@ -746,7 +747,7 @@ public class AMQPSessionCallback implements SessionCallback {
          }
       }
 
-      public synchronized void setResult(String parameterAddress, T result) {
+      public synchronized void setResult(SimpleString parameterAddress, T result) {
          this.address = parameterAddress;
          this.result = result;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
index 724474b..e67fc67 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 
@@ -38,7 +39,7 @@ public class AMQPConverter implements MessageConverter<AMQPMessage> {
    }
 
    @Override
-   public ICoreMessage toCore(AMQPMessage messageSource) throws Exception {
-      return AmqpCoreConverter.toCore(messageSource);
+   public ICoreMessage toCore(AMQPMessage messageSource, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+      return AmqpCoreConverter.toCore(messageSource, coreMessageObjectPools);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index da2f4e0..1bac1e5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -242,56 +243,56 @@ public final class AMQPMessageSupport {
       return null;
    }
 
-   public static ServerJMSBytesMessage createBytesMessage(long id) {
-      return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE));
+   public static ServerJMSBytesMessage createBytesMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException {
-      ServerJMSBytesMessage message = createBytesMessage(id);
+   public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSBytesMessage message = createBytesMessage(id, coreMessageObjectPools);
       message.writeBytes(array, arrayOffset, length);
       return message;
    }
 
-   public static ServerJMSStreamMessage createStreamMessage(long id) {
-      return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE));
+   public static ServerJMSStreamMessage createStreamMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSMessage createMessage(long id) {
-      return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE));
+   public static ServerJMSMessage createMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSTextMessage createTextMessage(long id) {
-      return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE));
+   public static ServerJMSTextMessage createTextMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException {
-      ServerJMSTextMessage message = createTextMessage(id);
+   public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSTextMessage message = createTextMessage(id, coreMessageObjectPools);
       message.setText(text);
       return message;
    }
 
-   public static ServerJMSObjectMessage createObjectMessage(long id) {
-      return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE));
+   public static ServerJMSObjectMessage createObjectMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException {
-      ServerJMSObjectMessage message = createObjectMessage(id);
+   public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
       message.setSerializedForm(serializedForm);
       return message;
    }
 
-   public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException {
-      ServerJMSObjectMessage message = createObjectMessage(id);
+   public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
       message.setSerializedForm(new Binary(array, offset, length));
       return message;
    }
 
-   public static ServerJMSMapMessage createMapMessage(long id) {
-      return new ServerJMSMapMessage(newMessage(id, MAP_TYPE));
+   public static ServerJMSMapMessage createMapMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
+      return new ServerJMSMapMessage(newMessage(id, MAP_TYPE, coreMessageObjectPools));
    }
 
-   public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException {
-      ServerJMSMapMessage message = createMapMessage(id);
+   public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
+      ServerJMSMapMessage message = createMapMessage(id, coreMessageObjectPools);
       final Set<Map.Entry<String, Object>> set = content.entrySet();
       for (Map.Entry<String, Object> entry : set) {
          Object value = entry.getValue();
@@ -304,8 +305,8 @@ public final class AMQPMessageSupport {
       return message;
    }
 
-   private static CoreMessage newMessage(long id, byte messageType) {
-      CoreMessage message = new CoreMessage(id, 512);
+   private static CoreMessage newMessage(long id, byte messageType, CoreMessageObjectPools coreMessageObjectPools) {
+      CoreMessage message = new CoreMessage(id, 512, coreMessageObjectPools);
       message.setType(messageType);
 //      ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index fbaf0ef..80969f6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -52,6 +52,7 @@ import javax.jms.JMSException;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@@ -89,31 +90,31 @@ import io.netty.buffer.PooledByteBufAllocator;
 public class AmqpCoreConverter {
 
    @SuppressWarnings("unchecked")
-   public static ICoreMessage toCore(AMQPMessage message) throws Exception {
+   public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
       Section body = message.getProtonMessage().getBody();
       ServerJMSMessage result;
 
       if (body == null) {
          if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID());
+            result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
          } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID());
+            result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
          } else {
             Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
             if (charset != null) {
-               result = createTextMessage(message.getMessageID());
+               result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
             } else {
-               result = createMessage(message.getMessageID());
+               result = createMessage(message.getMessageID(), coreMessageObjectPools);
             }
          }
       } else if (body instanceof Data) {
          Binary payload = ((Data) body).getValue();
 
          if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
          } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
          } else {
             Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
             if (StandardCharsets.UTF_8.equals(charset)) {
@@ -121,18 +122,18 @@ public class AmqpCoreConverter {
 
                try {
                   CharBuffer chars = charset.newDecoder().decode(buf);
-                  result = createTextMessage(message.getMessageID(), String.valueOf(chars));
+                  result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
                } catch (CharacterCodingException e) {
-                  result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+                  result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
                }
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
          }
 
       } else if (body instanceof AmqpSequence) {
          AmqpSequence sequence = (AmqpSequence) body;
-         ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+         ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
          for (Object item : sequence.getValue()) {
             m.writeObject(item);
          }
@@ -141,31 +142,31 @@ public class AmqpCoreConverter {
       } else if (body instanceof AmqpValue) {
          Object value = ((AmqpValue) body).getValue();
          if (value == null || value instanceof String) {
-            result = createTextMessage(message.getMessageID(), (String) value);
+            result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
 
          } else if (value instanceof Binary) {
             Binary payload = (Binary) value;
 
             if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-               result = createObjectMessage(message.getMessageID(), payload);
+               result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
 
          } else if (value instanceof List) {
-            ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+            ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
             for (Object item : (List<Object>) value) {
                m.writeObject(item);
             }
             result = m;
          } else if (value instanceof Map) {
-            result = createMapMessage(message.getMessageID(), (Map<String, Object>) value);
+            result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
          } else {
             ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
             try {
                TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
                TLSEncode.getEncoder().writeObject(body);
-               result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex());
+               result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
             } finally {
                buf.release();
                TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@@ -186,7 +187,7 @@ public class AmqpCoreConverter {
       result.getInnerMessage().setReplyTo(message.getReplyTo());
       result.getInnerMessage().setDurable(message.isDurable());
       result.getInnerMessage().setPriority(message.getPriority());
-      result.getInnerMessage().setAddress(message.getAddress());
+      result.getInnerMessage().setAddress(message.getAddressSimpleString());
 
       result.encode();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 3e1c0fe..3c35d76 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -54,7 +54,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    protected final Receiver receiver;
 
-   protected String address;
+   protected SimpleString address;
 
    protected final AMQPSessionCallback sessionSPI;
 
@@ -102,7 +102,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          if (target.getDynamic()) {
             // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
             // will be deleted on closing of the session
-            address = sessionSPI.tempQueueName();
+            address = SimpleString.toSimpleString(sessionSPI.tempQueueName());
             defRoutingType = getRoutingType(target.getCapabilities(), address);
 
             try {
@@ -113,12 +113,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
             expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
-            target.setAddress(address);
+            target.setAddress(address.toString());
          } else {
             // the target will have an address unless the remote is requesting an anonymous
             // relay in which case the address in the incoming message's to field will be
             // matched on receive of the message.
-            address = target.getAddress();
+            address = SimpleString.toSimpleString(target.getAddress());
 
             if (address != null && !address.isEmpty()) {
                defRoutingType = getRoutingType(target.getCapabilities(), address);
@@ -134,7 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
                }
 
                try {
-                  sessionSPI.check(SimpleString.toSimpleString(address), CheckType.SEND, new SecurityAuth() {
+                  sessionSPI.check(address, CheckType.SEND, new SecurityAuth() {
                      @Override
                      public String getUsername() {
                         String username = null;
@@ -181,12 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       flow(amqpCredits, minCreditRefresh);
    }
 
-   public RoutingType getRoutingType(Receiver receiver, String address) {
+   public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
       org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
       return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address);
    }
 
-   private RoutingType getRoutingType(Symbol[] symbols, String address) {
+   private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
       if (symbols != null) {
          for (Symbol symbol : symbols) {
             if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@@ -264,7 +264,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
       if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
          try {
-            sessionSPI.removeTemporaryQueue(target.getAddress());
+            sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(target.getAddress()));
          } catch (Exception e) {
             //ignore on close, its temp anyway and will be removed later
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fbaae8a..1823168 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -102,7 +102,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private boolean shared = false;
    private boolean global = false;
    private boolean isVolatile = false;
-   private String tempQueueName;
+   private SimpleString tempQueueName;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection,
                                     Sender sender,
@@ -157,7 +157,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       super.initialise();
 
       Source source = (Source) sender.getRemoteSource();
-      String queue = null;
+      SimpleString queue = null;
       String selector = null;
       final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
@@ -199,7 +199,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // the lifetime policy and capabilities of the new subscription.
          if (result.isExists()) {
             source = new org.apache.qpid.proton.amqp.messaging.Source();
-            source.setAddress(queue);
+            source.setAddress(queue.toString());
             source.setDurable(TerminusDurability.UNSETTLED_STATE);
             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
             source.setDistributionMode(COPY);
@@ -240,7 +240,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       } else if (source.getDynamic()) {
          // if dynamic we have to create the node (queue) and set the address on the target, the
          // node is temporary and  will be deleted on closing of the session
-         queue = java.util.UUID.randomUUID().toString();
+         queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
          tempQueueName = queue;
          try {
             sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
@@ -248,7 +248,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          } catch (Exception e) {
             throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
          }
-         source.setAddress(queue);
+         source.setAddress(queue.toString());
       } else {
          SimpleString addressToUse;
          SimpleString queueNameToUse = null;
@@ -269,7 +269,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             multicast = hasCapabilities(TOPIC, source);
             AddressQueryResult addressQueryResult = null;
             try {
-               addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
+               addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
             } catch (ActiveMQSecurityException e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
             } catch (ActiveMQAMQPException e) {
@@ -294,7 +294,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             // if not we look up the address
             AddressQueryResult addressQueryResult = null;
             try {
-               addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
+               addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true);
             } catch (ActiveMQSecurityException e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
             } catch (ActiveMQAMQPException e) {
@@ -333,6 +333,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
 
             queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
+            SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);
 
             //if the address specifies a broker configured queue then we always use this, treat it as a queue
             if (queue != null) {
@@ -345,24 +346,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                String pubId = sender.getName();
                queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
                QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
-
                if (result.isExists()) {
                   // If a client reattaches to a durable subscription with a different no-local
                   // filter value, selector or address then we must recreate the queue (JMS semantics).
-                  if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+                  if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
 
                      if (result.getConsumerCount() == 0) {
                         sessionSPI.deleteQueue(queue);
-                        sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                        sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                      } else {
                         throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                      }
                   }
                } else {
                   if (shared) {
-                     sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                     sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                   } else {
-                     sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                     sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                   }
                }
             } else {
@@ -371,15 +371,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                if (shared && sender.getName() != null) {
                   queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
                   try {
-                     sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                     sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                   } catch (ActiveMQQueueExistsException e) {
                      //this is ok, just means its shared
                   }
                } else {
-                  queue = java.util.UUID.randomUUID().toString();
+                  queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
                   tempQueueName = queue;
                   try {
-                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
+                     sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
                   } catch (Exception e) {
                      throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                   }
@@ -387,18 +387,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
          } else {
             if (queueNameToUse != null) {
-               SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST));
+               SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST);
                if (matchingAnycastQueue != null) {
-                  queue = matchingAnycastQueue.toString();
+                  queue = matchingAnycastQueue;
                } else {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
             } else {
                SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
                if (matchingAnycastQueue != null) {
-                  queue = matchingAnycastQueue.toString();
+                  queue = matchingAnycastQueue;
                } else {
-                  queue = addressToUse.toString();
+                  queue = addressToUse;
                }
             }
 
@@ -437,16 +437,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
-   private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
+   private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
       if (queueName != null) {
-         QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false);
+         QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false);
          if (!result.isExists()) {
             throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
          } else {
             if (!result.getAddress().equals(address)) {
                throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
             }
-            return sessionSPI.getMatchingQueue(address, queueName, routingType).toString();
+            return sessionSPI.getMatchingQueue(address, queueName, routingType);
          }
       }
       return null;
@@ -495,7 +495,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
             if (source != null && source.getAddress() != null && multicast) {
-               String queueName = source.getAddress();
+               SimpleString queueName = SimpleString.toSimpleString(source.getAddress());
                QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
@@ -508,7 +508,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                      if (pubId.contains("|")) {
                         pubId = pubId.split("\\|")[0];
                      }
-                     String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
+                     SimpleString queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
                      result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
                      //only delete if it isn't volatile and has no consumers
                      if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
@@ -518,7 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                }
             } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
                try {
-                  sessionSPI.removeTemporaryQueue(source.getAddress());
+                  sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress()));
                } catch (Exception e) {
                   //ignore on close, its temp anyway and will be removed later
                }
@@ -760,7 +760,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return false;
    }
 
-   private static String createQueueName(boolean useCoreSubscriptionNaming,
+   private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
                                          String clientId,
                                          String pubId,
                                          boolean shared,
@@ -784,7 +784,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                queue += ":global";
             }
          }
-         return queue;
+         return SimpleString.toSimpleString(queue);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index d06464f..ccafd37 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -620,7 +620,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
-      ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0);
+      ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0, null);
 
       if (compression) {
          // TODO
@@ -647,7 +647,7 @@ public class JMSMappingOutboundTransformerTest {
    }
 
    private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
-      ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0);
+      ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0, null);
 
       if (compression) {
          // TODO

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 73dbeaa..da10f47 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -56,11 +57,12 @@ public class MQTTSession {
 
    private MQTTProtocolManager protocolManager;
 
-
    private boolean isClean;
 
    private WildcardConfiguration wildcardConfiguration;
 
+   private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
    public MQTTSession(MQTTProtocolHandler protocolHandler,
                       MQTTConnection connection,
                       MQTTProtocolManager protocolManager,
@@ -195,4 +197,8 @@ public class MQTTSession {
    public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) {
       this.wildcardConfiguration = wildcardConfiguration;
    }
+
+   public CoreMessageObjectPools getCoreMessageObjectPools() {
+      return coreMessageObjectPools;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 21b1f2b..39e2ba9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -78,7 +78,7 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumer, String queueName) {
+   public void disconnect(ServerConsumer consumer, SimpleString queueName) {
       try {
          consumer.removeItself();
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 2cb1f7e..2667f81 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -64,13 +64,13 @@ public class MQTTUtil {
 
    public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
 
-   public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
+   public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level");
 
-   public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id";
+   public static final SimpleString MQTT_MESSAGE_ID_KEY = SimpleString.toSimpleString("mqtt.message.id");
 
-   public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
+   public static final SimpleString MQTT_MESSAGE_TYPE_KEY = SimpleString.toSimpleString("mqtt.message.type");
 
-   public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain");
+   public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain");
 
    public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
 
@@ -113,10 +113,10 @@ public class MQTTUtil {
                                                     int qos) {
       long id = session.getServer().getStorageManager().generateID();
 
-      CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+      CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE, session.getCoreMessageObjectPools());
       message.setAddress(address);
       message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
-      message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+      message.putIntProperty(MQTT_QOS_LEVEL_KEY, qos);
       message.setType(Message.BYTES_TYPE);
       return message;
    }
@@ -127,7 +127,8 @@ public class MQTTUtil {
                                                               int qos,
                                                               ByteBuf payload) {
       String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
-      ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+      SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
+      ICoreMessage message = createServerMessage(session, address, retain, qos);
 
       message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
       return message;
@@ -135,8 +136,8 @@ public class MQTTUtil {
 
    public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
       Message message = createServerMessage(session, address, false, 1);
-      message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
-      message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
+      message.putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId);
+      message.putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, MqttMessageType.PUBREL.value());
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 9923953..86a95db 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1121,7 +1121,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
-         SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
+         SimpleString subQueueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName());
          server.destroyQueue(subQueueName);
 
          return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 6af9997..83ff6d6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -108,10 +109,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    }
 
    @Override
-   public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception {
+   public ICoreMessage toCore(OpenwireMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
       return null;
    }
 
+
    //   @Override
    public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
       // TODO: implement this
@@ -119,10 +121,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    }
 
 //   @Override
-   public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
+   public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
       Message messageSend = (Message) message;
-      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize());
+      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
 
       String type = messageSend.getType();
       if (type != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index d28eda4..c63fe19 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessageListener;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
 // TODO: Implement this
@@ -442,6 +443,11 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
+   public Message putStringProperty(SimpleString key, String value) {
+      return null;
+   }
+
+   @Override
    public int getEncodeSize() {
       return 0;
    }
@@ -478,6 +484,11 @@ public class OpenwireMessage implements Message {
 
    @Override
    public ICoreMessage toCore() {
+      return toCore(null);
+   }
+
+   @Override
+   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
       return null;
    }
 


Mime
View raw message