activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [5/5] activemq-artemis git commit: Fixing converters part I
Date Fri, 03 Mar 2017 00:55:26 GMT
Fixing converters part I


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b557f2df
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b557f2df
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b557f2df

Branch: refs/heads/artemis-1009
Commit: b557f2df6d0287a21adfaf67f6e93875ba174b5b
Parents: 9d3260b
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Mar 2 18:57:23 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Mar 2 19:38:06 2017 -0500

----------------------------------------------------------------------
 .../cli/commands/tools/XmlDataExporter.java     |  32 +-
 .../cli/commands/tools/XmlDataExporterUtil.java |   9 +-
 .../cli/commands/tools/XmlDataImporter.java     |   7 +-
 .../activemq/artemis/api/core/ICoreMessage.java |  87 +++
 .../activemq/artemis/api/core/Message.java      |  96 +--
 .../artemis/api/core/client/ClientMessage.java  |  61 +-
 .../api/core/management/ManagementHelper.java   |  18 +-
 .../core/client/impl/ClientConsumerImpl.java    |   2 +-
 .../core/client/impl/ClientProducerImpl.java    |  18 +-
 .../artemis/core/message/impl/CoreMessage.java  |  73 +--
 .../core/impl/ActiveMQSessionContext.java       |   3 +-
 .../core/impl/wireformat/MessagePacket.java     |   9 +-
 .../impl/wireformat/SessionReceiveMessage.java  |   8 +-
 .../impl/wireformat/SessionSendMessage.java     |   4 +-
 .../spi/core/remoting/SessionContext.java       |   5 +-
 .../api/jms/management/JMSManagementHelper.java |   3 +-
 .../jms/transaction/JMSTransactionDetail.java   |   6 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  91 +--
 .../amqp/broker/AMQPSessionCallback.java        |  16 +-
 .../amqp/broker/ProtonProtocolManager.java      |  12 +-
 .../protocol/amqp/converter/AMQPConverter.java  |  44 ++
 .../amqp/converter/AmqpCoreConverter.java       | 366 +++++++++++
 .../amqp/converter/CoreAmqpConverter.java       | 565 +++++++++++++++++
 .../amqp/converter/ProtonMessageConverter.java  | 101 ----
 .../converter/jms/ServerJMSBytesMessage.java    |   6 +-
 .../amqp/converter/jms/ServerJMSMapMessage.java |   5 +-
 .../amqp/converter/jms/ServerJMSMessage.java    |  64 +-
 .../converter/jms/ServerJMSObjectMessage.java   |   8 +-
 .../converter/jms/ServerJMSStreamMessage.java   |   5 +-
 .../converter/jms/ServerJMSTextMessage.java     |   5 +-
 .../converter/message/AMQPMessageSupport.java   |  76 +--
 .../converter/message/AMQPMessageTypes.java     |  30 -
 .../message/AMQPNativeOutboundTransformer.java  |  80 ---
 .../amqp/converter/message/EncodedMessage.java  |  67 ---
 .../converter/message/InboundTransformer.java   | 240 --------
 .../message/JMSMappingInboundTransformer.java   | 182 ------
 .../message/JMSMappingOutboundTransformer.java  | 574 ------------------
 .../converter/message/OutboundTransformer.java  |  53 --
 .../amqp/proton/ProtonServerSenderContext.java  |  32 +-
 .../amqp/converter/TestConversions.java         | 599 +------------------
 .../JMSMappingInboundTransformerTest.java       | 166 ++---
 .../JMSMappingOutboundTransformerTest.java      | 204 ++-----
 .../JMSTransformationSpeedComparisonTest.java   |  86 ++-
 .../message/MessageTransformationTest.java      | 116 +---
 .../protocol/amqp/message/AMQPMessageTest.java  |   2 +-
 .../core/protocol/mqtt/MQTTProtocolManager.java |   6 -
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |   5 +-
 .../openwire/OpenWireMessageConverter.java      |  19 +-
 .../openwire/OpenWireProtocolManager.java       |   8 +-
 .../core/protocol/openwire/OpenwireMessage.java | 473 +++++++++++++++
 .../core/protocol/openwire/amq/AMQConsumer.java |   5 +-
 .../core/protocol/openwire/amq/AMQSession.java  |   5 +-
 .../core/protocol/stomp/StompConnection.java    |   6 +-
 .../protocol/stomp/StompProtocolManager.java    |   8 -
 .../core/protocol/stomp/StompSession.java       |   4 +-
 .../stomp/VersionedStompFrameHandler.java       |   5 +-
 .../stomp/v12/StompFrameHandlerV12.java         |   4 +-
 .../impl/openmbean/OpenTypeSupport.java         |  12 +-
 .../activemq/artemis/core/paging/impl/Page.java |   3 +-
 .../core/paging/impl/PagedMessageImpl.java      |   5 +-
 .../core/ServerSessionPacketHandler.java        |   3 +-
 .../protocol/core/impl/CoreProtocolManager.java |  13 +-
 .../protocol/core/impl/CoreSessionCallback.java |   5 +-
 .../artemis/core/server/LargeServerMessage.java |   4 +-
 .../artemis/core/server/ServerConsumer.java     |   3 +-
 .../core/server/impl/ServerConsumerImpl.java    |  11 +-
 .../server/management/ManagementService.java    |   6 +-
 .../management/impl/ManagementServiceImpl.java  |   5 +-
 .../transaction/impl/CoreTransactionDetail.java |   7 +-
 .../spi/core/protocol/MessageConverter.java     |   7 +-
 .../spi/core/protocol/ProtocolManager.java      |  10 +-
 .../group/impl/ClusteredResetMockTest.java      |   6 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  56 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |   3 +-
 .../integration/client/AcknowledgeTest.java     |  62 +-
 .../tests/integration/client/ConsumerTest.java  |  43 +-
 .../integration/client/LargeMessageTest.java    |   3 +-
 .../integration/clientcrash/ClientExitTest.java |   4 +-
 .../integration/journal/MessageJournalTest.java |   5 +-
 .../management/ManagementHelperTest.java        |   8 +-
 .../management/ManagementServiceImplTest.java   |  19 +-
 .../integration/paging/PagingSendTest.java      |   3 +-
 .../tests/integration/paging/PagingTest.java    |   4 +-
 .../tests/integration/server/ScaleDownTest.java |   4 +-
 .../ssl/CoreClientOverOneWaySSLTest.java        |   4 +-
 .../ssl/CoreClientOverTwoWaySSLTest.java        |   5 +-
 .../unit/core/message/impl/MessageImplTest.java |   5 +-
 .../tests/unit/core/paging/impl/PageTest.java   |   4 +-
 .../core/paging/impl/PagingManagerImplTest.java |  14 +-
 .../core/paging/impl/PagingStoreImplTest.java   |  24 +-
 90 files changed, 2136 insertions(+), 3018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index b57b5c5..b53db48 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.cli.commands.tools;
 
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
 import java.io.File;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationHandler;
@@ -33,14 +36,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
-import javax.xml.stream.XMLOutputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-
+import io.airlift.airline.Command;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -74,8 +76,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.api.core.RoutingType;
-
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
@@ -83,8 +83,6 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 
-import io.airlift.airline.Command;
-
 @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
 public final class XmlDataExporter extends OptionalLocking {
 
@@ -361,13 +359,13 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter.writeEndElement(); // end BINDINGS_PARENT
    }
 
-   private void printAllMessagesAsXML() throws XMLStreamException {
+   private void printAllMessagesAsXML() throws Exception {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
 
       // Order here is important.  We must process the messages from the journal before we process those from the page
       // files in order to get the messages in the right order.
       for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
-         printSingleMessageAsXML(messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+         printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
       }
 
       printPagedMessagesAsXML();
@@ -441,7 +439,7 @@ public final class XmlDataExporter extends OptionalLocking {
                      }
 
                      if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) {
-                        printSingleMessageAsXML(message.getMessage(), queueNames);
+                        printSingleMessageAsXML(message.getMessage().toCore(), queueNames);
                      }
 
                      messageId++;
@@ -458,20 +456,20 @@ public final class XmlDataExporter extends OptionalLocking {
       }
    }
 
-   private void printSingleMessageAsXML(Message message, List<String> queues) throws XMLStreamException {
+   private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
       printMessageAttributes(message);
       printMessageProperties(message);
       printMessageQueues(queues);
-      printMessageBody(message);
+      printMessageBody(message.toCore());
       xmlWriter.writeEndElement(); // end MESSAGES_CHILD
       messagesPrinted++;
    }
 
-   private void printMessageBody(Message message) throws XMLStreamException {
+   private void printMessageBody(Message message) throws Exception {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
 
-      if (message.isLargeMessage()) {
+      if (message.toCore().isLargeMessage()) {
          printLargeMessageBody((LargeServerMessage) message);
       } else {
          xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message));
@@ -484,7 +482,7 @@ public final class XmlDataExporter extends OptionalLocking {
       LargeBodyEncoder encoder = null;
 
       try {
-         encoder = message.getBodyEncoder();
+         encoder = message.toCore().getBodyEncoder();
          encoder.open();
          long totalBytesWritten = 0;
          Long bufferSize;
@@ -541,7 +539,7 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
    }
 
-   private void printMessageAttributes(Message message) throws XMLStreamException {
+   private void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
index a3807bd..ca7f1a8 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
@@ -17,9 +17,9 @@
 package org.apache.activemq.artemis.cli.commands.tools;
 
 import com.google.common.base.Preconditions;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.utils.Base64;
 
 /**
@@ -91,12 +91,13 @@ public class XmlDataExporterUtil {
     * @param message
     * @return
     */
-   public static String encodeMessageBody(final Message message) {
+   public static String encodeMessageBody(final Message message) throws Exception {
       Preconditions.checkNotNull(message, "ServerMessage can not be null");
 
-      int size = ((CoreMessage)message.toCore()).getEndOfBodyPosition() - message.getBodyBuffer().readerIndex();
+      ICoreMessage coreMessage = message.toCore();
+      int size = coreMessage.getEndOfBodyPosition() - coreMessage.getBodyBuffer().readerIndex();
       byte[] buffer = new byte[size];
-      message.getBodyBuffer().readBytes(buffer);
+      message.toCore().getBodyBuffer().readBytes(buffer);
 
       return XmlDataExporterUtil.encode(buffer);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
index 0f06738..518d231 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
@@ -45,7 +45,9 @@ import java.util.UUID;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -62,7 +64,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ClassloadingUtil;
 import org.apache.activemq.artemis.utils.ListUtil;
@@ -297,7 +298,7 @@ public final class XmlDataImporter extends ActionAbstract {
          switch (eventType) {
             case XMLStreamConstants.START_ELEMENT:
                if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
-                  processMessageBody(message);
+                  processMessageBody(message.toCore());
                } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
                   processMessageProperties(message);
                } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
@@ -468,7 +469,7 @@ public final class XmlDataImporter extends ActionAbstract {
       }
    }
 
-   private void processMessageBody(final Message message) throws XMLStreamException, IOException {
+   private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
       boolean isLarge = false;
 
       for (int i = 0; i < reader.getAttributeCount(); i++) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
new file mode 100644
index 0000000..9a58819
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+
+/**
+ * This interface is only to determine the API of methods required for Core Messages
+ */
+public interface ICoreMessage extends Message {
+
+   LargeBodyEncoder getBodyEncoder() throws ActiveMQException;
+
+   int getHeadersAndPropertiesEncodeSize();
+
+   @Override
+   InputStream getBodyInputStream();
+
+   /** Returns a new Buffer slicing the current Body. */
+   ActiveMQBuffer getReadOnlyBodyBuffer();
+
+   /** Return the type of the message */
+   @Override
+   byte getType();
+
+   /** the type of the message */
+   @Override
+   CoreMessage setType(byte type);
+
+   /**
+    * We are really interested if this is a LargeServerMessage.
+    * @return
+    */
+   boolean isServerMessage();
+
+   /**
+    * The body used for this message.
+    * @return
+    */
+   @Override
+   ActiveMQBuffer getBodyBuffer();
+
+   int getEndOfBodyPosition();
+
+
+   /**
+    * @return Returns the message in Map form, useful when encoding to JSON
+    */
+   @Override
+   default Map<String, Object> toMap() {
+      Map map = toPropertyMap();
+      map.put("messageID", getMessageID());
+      Object userID = getUserID();
+      if (getUserID() != null) {
+         map.put("userID", "ID:" + userID.toString());
+      }
+
+      map.put("address", getAddress());
+      map.put("type", getType());
+      map.put("durable", isDurable());
+      map.put("expiration", getExpiration());
+      map.put("timestamp", getTimestamp());
+      map.put("priority", (int)getPriority());
+
+      return map;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index b08202d..73ee856 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -22,8 +22,6 @@ import java.util.Map;
 import java.util.Set;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.api.core.encode.BodyType;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
 /**
@@ -166,36 +164,44 @@ public interface Message {
 
    byte STREAM_TYPE = 6;
 
-
-   void messageChanged();
-
-   /** Used to calculate what is the delivery time.
-    *  Return null if not scheduled. */
-   Long getScheduledDeliveryTime();
-
-   /** Used for Large messages on Core.
-    *  Do not use this, it will go away
-    *  @deprecated  use it directly from core message, as it doesn't make sense on other protocols */
+   /**
+    * @deprecated do not use this, use through ICoreMessage or ClientMessage
+    */
    @Deprecated
    default InputStream getBodyInputStream() {
       return null;
    }
 
    /**
-    * Careful: Unless you are changing the body of the message, prefer getReadOnlyBodyBuffer
-    *  @deprecated  use it directly from core message, as it doesn't make sense on other protocols */
+    * @deprecated do not use this, use through ICoreMessage or ClientMessage
+    */
    @Deprecated
-   ActiveMQBuffer getBodyBuffer();
+   default ActiveMQBuffer getBodyBuffer() {
+      return null;
+   }
 
-   /**
-    *  @deprecated  use it directly from core message, as it doesn't make sense on other protocols */
+      /**
+       * @deprecated do not use this, use through ICoreMessage or ClientMessage
+       */
    @Deprecated
-   ActiveMQBuffer getReadOnlyBodyBuffer();
+   default byte getType() {
+      return (byte)0;
+   }
 
-   /** Used in the cases of large messages
-    *  @deprecated  use it directly from core message, as it doesn't make sense on other protocols */
+   /**
+    * @deprecated do not use this, use through ICoreMessage or ClientMessage
+    */
    @Deprecated
-   LargeBodyEncoder getBodyEncoder() throws ActiveMQException;
+   default Message setType(byte type) {
+      return this;
+   }
+
+
+   void messageChanged();
+
+   /** Used to calculate what is the delivery time.
+    *  Return null if not scheduled. */
+   Long getScheduledDeliveryTime();
 
    /** Context can be used by the application server to inject extra control, like a protocol specific on the server.
     * There is only one per Object, use it wisely!
@@ -209,27 +215,6 @@ public interface Message {
    /** The buffer will belong to this message, until release is called. */
    Message setBuffer(ByteBuf buffer);
 
-   // TODO-now: Do we need this?
-   byte getType();
-
-   // TODO-now: Do we need this?
-   Message setType(byte type);
-
-   /**
-    * Returns whether this message is a <em>large message</em> or a regular message.
-    */
-   boolean isLargeMessage();
-
-   /**
-    * TODO: There's currently some treatment on LargeMessage that is done for server's side large message
-    *       This needs to be refactored, this Method shouldn't be used at all.
-    * @Deprecated do not use this, internal use only. *It will* be removed for sure even on minor releases.
-    * */
-   @Deprecated
-   default boolean isServerMessage() {
-      return false;
-   }
-
    ByteBuf getBuffer();
 
    /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
@@ -247,6 +232,10 @@ public interface Message {
 
    Message setMessageID(long id);
 
+   default boolean isLargeMessage() {
+      return false;
+   }
+
    /**
     * Returns the expiration time of this message.
     */
@@ -297,16 +286,6 @@ public interface Message {
 
    Persister<Message> getPersister();
 
-   Object getProtocol();
-
-   Message setProtocol(Object protocol);
-
-   Object getBody();
-
-   BodyType getBodyType();
-
-   Message setBody(BodyType type, Object body);
-
    String getAddress();
 
    Message setAddress(String address);
@@ -356,16 +335,6 @@ public interface Message {
       }
       setBuffer(null);
    }
-
-   default String getText() {
-      if (getBodyType() == BodyType.Text) {
-         return getBody().toString();
-      } else {
-         return null;
-      }
-   }
-
-   // TODO-now: move this to some utility class
    default void referenceOriginalMessage(final Message original, String originalQueue) {
       String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString());
 
@@ -559,7 +528,6 @@ public interface Message {
       }
 
       map.put("address", getAddress());
-      map.put("type", getBodyType().toString());
       map.put("durable", isDurable());
       map.put("expiration", getExpiration());
       map.put("timestamp", getTimestamp());
@@ -581,7 +549,7 @@ public interface Message {
 
 
    /** This should make you convert your message into Core format. */
-   Message toCore();
+   ICoreMessage toCore();
 
    int getMemoryEstimate();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
index daded00..67f2150 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
@@ -19,14 +19,15 @@ package org.apache.activemq.artemis.api.core.client;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
 /**
  * A ClientMessage represents a message sent and/or received by ActiveMQ Artemis.
  */
-public interface ClientMessage extends Message {
+public interface ClientMessage extends ICoreMessage {
 
    /**
     * Returns the number of times this message was delivered.
@@ -123,126 +124,140 @@ public interface ClientMessage extends Message {
    ClientMessage setBodyInputStream(InputStream bodyInputStream);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Return the bodyInputStream for large messages
+    * @return
+    */
+   @Override
+   InputStream getBodyInputStream();
+
+   /**
+    * The buffer to write the body.
+    * @return
+    */
+   @Override
+   ActiveMQBuffer getBodyBuffer();
+
+   /**
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBooleanProperty(SimpleString key, boolean value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBooleanProperty(String key, boolean value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putByteProperty(SimpleString key, byte value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putByteProperty(String key, byte value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBytesProperty(SimpleString key, byte[] value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBytesProperty(String key, byte[] value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putShortProperty(SimpleString key, short value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putShortProperty(String key, short value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putCharProperty(SimpleString key, char value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putCharProperty(String key, char value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putIntProperty(SimpleString key, int value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putIntProperty(String key, int value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putLongProperty(SimpleString key, long value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putLongProperty(String key, long value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putFloatProperty(SimpleString key, float value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putFloatProperty(String key, float value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putDoubleProperty(SimpleString key, double value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putDoubleProperty(String key, double value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putStringProperty(String key, String value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    ClientMessage writeBodyBufferBytes(byte[] bytes);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    ClientMessage writeBodyBufferString(String string);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 40211c1..946285d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -18,9 +18,11 @@ package org.apache.activemq.artemis.api.core.management;
 
 import javax.json.JsonArray;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 /**
  * Helper class to use ActiveMQ Artemis Core messages to manage server resources.
@@ -86,7 +88,7 @@ public final class ManagementHelper {
     * @param attribute    the name of the attribute
     * @see ResourceNames
     */
-   public static void putAttribute(final Message message, final String resourceName, final String attribute) {
+   public static void putAttribute(final ICoreMessage message, final String resourceName, final String attribute) {
       message.putStringProperty(ManagementHelper.HDR_RESOURCE_NAME, new SimpleString(resourceName));
       message.putStringProperty(ManagementHelper.HDR_ATTRIBUTE, new SimpleString(attribute));
    }
@@ -99,7 +101,7 @@ public final class ManagementHelper {
     * @param operationName the name of the operation to invoke on the resource
     * @see ResourceNames
     */
-   public static void putOperationInvocation(final Message message,
+   public static void putOperationInvocation(final ICoreMessage message,
                                              final String resourceName,
                                              final String operationName) throws Exception {
       ManagementHelper.putOperationInvocation(message, resourceName, operationName, (Object[]) null);
@@ -114,7 +116,7 @@ public final class ManagementHelper {
     * @param parameters    the parameters to use to invoke the server resource
     * @see ResourceNames
     */
-   public static void putOperationInvocation(final Message message,
+   public static void putOperationInvocation(final ICoreMessage message,
                                              final String resourceName,
                                              final String operationName,
                                              final Object... parameters) throws Exception {
@@ -141,7 +143,7 @@ public final class ManagementHelper {
     * Used by ActiveMQ Artemis management service.
     */
    public static Object[] retrieveOperationParameters(final Message message) throws Exception {
-      SimpleString sstring = message.getBodyBuffer().readNullableSimpleString();
+      SimpleString sstring = message.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
       String jsonString = (sstring == null) ? null : sstring.toString();
 
       if (jsonString != null) {
@@ -170,7 +172,7 @@ public final class ManagementHelper {
    /**
     * Used by ActiveMQ Artemis management service.
     */
-   public static void storeResult(final Message message, final Object result) throws Exception {
+   public static void storeResult(final CoreMessage message, final Object result) throws Exception {
       String resultString;
 
       if (result != null) {
@@ -192,7 +194,7 @@ public final class ManagementHelper {
     * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
     * and the result will be a String corresponding to the server exception.
     */
-   public static Object[] getResults(final Message message) throws Exception {
+   public static Object[] getResults(final ICoreMessage message) throws Exception {
       SimpleString sstring = message.getBodyBuffer().readNullableSimpleString();
       String jsonString = (sstring == null) ? null : sstring.toString();
 
@@ -210,7 +212,7 @@ public final class ManagementHelper {
     * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
     * and the result will be a String corresponding to the server exception.
     */
-   public static Object getResult(final Message message) throws Exception {
+   public static Object getResult(final ICoreMessage message) throws Exception {
       return getResult(message, null);
    }
 
@@ -220,7 +222,7 @@ public final class ManagementHelper {
     * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
     * and the result will be a String corresponding to the server exception.
     */
-   public static Object getResult(final Message message, Class desiredType) throws Exception {
+   public static Object getResult(final ICoreMessage message, Class desiredType) throws Exception {
       Object[] res = ManagementHelper.getResults(message);
 
       if (res != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index d95aeba..82af968 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -625,7 +625,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
       currentLargeMessageController.setLocal(true);
 
       //sets the packet
-      ActiveMQBuffer qbuff = clMessage.getBodyBuffer();
+      ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
       int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
       final byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 1704de0..ce4a8a1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -23,12 +23,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
 import org.apache.activemq.artemis.utils.DeflaterReader;
@@ -218,7 +218,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
       try {
          // In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core
-         Message msg = msgToSend.toCore();
+         ICoreMessage msg = msgToSend.toCore();
 
          ClientProducerCredits theCredits;
 
@@ -259,7 +259,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
          session.workDone();
 
          if (isLarge) {
-            largeMessageSend(sendBlocking, (CoreMessage)msg, theCredits, handler);
+            largeMessageSend(sendBlocking, msg, theCredits, handler);
          } else {
             sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler);
          }
@@ -268,12 +268,12 @@ public class ClientProducerImpl implements ClientProducerInternal {
       }
    }
 
-   private InputStream getBodyInputStream(Message msgI) {
+   private InputStream getBodyInputStream(ICoreMessage msgI) {
       return msgI.getBodyInputStream();
    }
 
    private void sendRegularMessage(final SimpleString sendingAddress,
-                                   final Message msgI,
+                                   final ICoreMessage msgI,
                                    final boolean sendBlocking,
                                    final ClientProducerCredits theCredits,
                                    final SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -306,7 +306,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSend(final boolean sendBlocking,
-                                 final CoreMessage msgI,
+                                 final ICoreMessage msgI,
                                  final ClientProducerCredits credits,
                                  SendAcknowledgementHandler handler) throws ActiveMQException {
       logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking);
@@ -353,7 +353,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendServer(final boolean sendBlocking,
-                                       final Message msgI,
+                                       final ICoreMessage msgI,
                                        final ClientProducerCredits credits,
                                        SendAcknowledgementHandler handler) throws ActiveMQException {
       sendInitialLargeMessageHeader(msgI, credits);
@@ -394,7 +394,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendBuffered(final boolean sendBlocking,
-                                         final Message msgI,
+                                         final ICoreMessage msgI,
                                          final ClientProducerCredits credits,
                                          SendAcknowledgementHandler handler) throws ActiveMQException {
       msgI.getBodyBuffer().readerIndex(0);
@@ -409,7 +409,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendStreamed(final boolean sendBlocking,
-                                         final Message msgI,
+                                         final ICoreMessage msgI,
                                          final InputStream inputStreamParameter,
                                          final ClientProducerCredits credits,
                                          SendAcknowledgementHandler handler) throws ActiveMQException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index f620a1d..bf642e0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.artemis.core.message.impl;
 
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
@@ -25,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -41,7 +43,7 @@ import org.jboss.logging.Logger;
 
 /** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
  *  consumers */
-public class CoreMessage extends RefCountMessage {
+public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
 
@@ -89,23 +91,10 @@ public class CoreMessage extends RefCountMessage {
 
    protected volatile TypedProperties properties;
 
-   private Object protocol;
-
    public CoreMessage() {
    }
 
    @Override
-   public CoreMessage setProtocol(Object protocol) {
-      this.protocol = protocol;
-      return this;
-   }
-
-   @Override
-   public Object getProtocol() {
-      return protocol;
-   }
-
-   @Override
    public Persister<Message> getPersister() {
       return CoreMessagePersister.getInstance();
    }
@@ -164,6 +153,11 @@ public class CoreMessage extends RefCountMessage {
       return null;
    }
 
+   @Override
+   public InputStream getBodyInputStream() {
+      return null;
+   }
+
    /**
     * {@inheritDoc}
     */
@@ -187,6 +181,7 @@ public class CoreMessage extends RefCountMessage {
       }
    }
 
+   @Override
    public int getEndOfBodyPosition() {
       if (endOfBodyPosition < 0) {
          endOfBodyPosition = getBodyBuffer().writerIndex();
@@ -238,7 +233,7 @@ public class CoreMessage extends RefCountMessage {
       messageID = msg.getMessageID();
       address = msg.getAddressSimpleString();
       userID = (UUID)msg.getUserID();
-      type = msg.getType();
+      type = msg.toCore().getType();
       durable = msg.isDurable();
       expiration = msg.getExpiration();
       timestamp = msg.getTimestamp();
@@ -369,6 +364,17 @@ public class CoreMessage extends RefCountMessage {
    }
 
    @Override
+   public boolean isServerMessage() {
+      // even though CoreMessage is used both on server and client
+      // callers are interested in knowing if this is a server large message
+      // as it will be used to send the body from the files.
+      //
+      // this may need further refactoring when we improve large messages
+      // and expose that functionality to other protocols.
+      return false;
+   }
+
+   @Override
    public byte getType() {
       return type;
    }
@@ -467,27 +473,6 @@ public class CoreMessage extends RefCountMessage {
    }
 
    @Override
-   public Object getBody() {
-
-      if (body == null) {
-         decodeBody();
-      }
-
-      return body;
-   }
-
-   private void decodeBody() {
-      buffer.readerIndex(DataConstants.SIZE_INT);
-      switch (getBodyType()) {
-         case Text:
-            body = SimpleString.readNullableSimpleString(buffer);
-            break;
-
-         default:
-            break;
-      }
-   }
-
    public int getHeadersAndPropertiesEncodeSize() {
       return DataConstants.SIZE_LONG + // Message ID
          DataConstants.SIZE_BYTE + // user id null?
@@ -501,10 +486,6 @@ public class CoreMessage extends RefCountMessage {
              /* PropertySize and Properties */checkProperties().getEncodeSize();
    }
 
-   @Override
-   public BodyType getBodyType() {
-      return getBodyType(type);
-   }
 
    public static BodyType getBodyType(byte type) {
       switch (type) {
@@ -540,16 +521,6 @@ public class CoreMessage extends RefCountMessage {
    }
 
    @Override
-   public CoreMessage setBody(final BodyType bodyType, Object body) {
-      messageChanged();
-
-      this.type = Message.TEXT_TYPE;
-      this.body = body;
-
-      return this;
-   }
-
-   @Override
    public boolean isLargeMessage() {
       return false;
    }
@@ -1016,7 +987,7 @@ public class CoreMessage extends RefCountMessage {
    }
 
    @Override
-   public Message toCore() {
+   public CoreMessage toCore() {
       return this;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 9975a5b..38cc177 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -427,7 +428,7 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
-   public void sendFullMessage(Message msgI,
+   public void sendFullMessage(ICoreMessage msgI,
                                boolean sendBlocking,
                                SendAcknowledgementHandler handler,
                                SimpleString defaultAddress) throws ActiveMQException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
index ec2520a..49989d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
@@ -16,26 +16,25 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
-
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
 
-   protected Message message;
+   protected ICoreMessage message;
 
-   public MessagePacket(final byte type, final Message message) {
+   public MessagePacket(final byte type, final ICoreMessage message) {
       super(type);
 
       this.message = message;
    }
 
    @Override
-   public Message getMessage() {
+   public ICoreMessage getMessage() {
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index c03d3c8..b0ab52b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -17,8 +17,8 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -31,7 +31,7 @@ public class SessionReceiveMessage extends MessagePacket {
 
    private int deliveryCount;
 
-   public SessionReceiveMessage(final long consumerID, final Message message, final int deliveryCount) {
+   public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) {
       super(SESS_RECEIVE_MSG, message);
 
       this.consumerID = consumerID;
@@ -39,7 +39,7 @@ public class SessionReceiveMessage extends MessagePacket {
       this.deliveryCount = deliveryCount;
    }
 
-   public SessionReceiveMessage(final Message message) {
+   public SessionReceiveMessage(final CoreMessage message) {
       super(SESS_RECEIVE_MSG, message);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index 8182b90..43bb0be 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -38,7 +38,7 @@ public class SessionSendMessage extends MessagePacket {
    private final transient SendAcknowledgementHandler handler;
 
    /** This will be using the CoreMessage because it is meant for the core-protocol */
-   public SessionSendMessage(final Message message,
+   public SessionSendMessage(final ICoreMessage message,
                              final boolean requiresResponse,
                              final SendAcknowledgementHandler handler) {
       super(SESS_SEND, message);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 8bb0081..3fddb8e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -23,7 +23,9 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -33,7 +35,6 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -129,7 +130,7 @@ public abstract class SessionContext {
 
    public abstract int getCreditsOnSendingFull(Message msgI);
 
-   public abstract void sendFullMessage(Message msgI,
+   public abstract void sendFullMessage(ICoreMessage msgI,
                                         boolean sendBlocking,
                                         SendAcknowledgementHandler handler,
                                         SimpleString defaultAddress) throws ActiveMQException;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
index 0e99106..4d0306b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.jms.management;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 
@@ -27,7 +28,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
  */
 public class JMSManagementHelper {
 
-   private static org.apache.activemq.artemis.api.core.Message getCoreMessage(final Message jmsMessage) {
+   private static ClientMessage getCoreMessage(final Message jmsMessage) {
       if (jmsMessage instanceof ActiveMQMessage == false) {
          throw new IllegalArgumentException("Cannot send a foreign message as a management message " + jmsMessage.getClass().getName());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
index 289f88c..ecb4ccb 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jms.transaction;
 import javax.transaction.xa.Xid;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionDetail;
@@ -37,7 +38,10 @@ public class JMSTransactionDetail extends TransactionDetail {
 
    @Override
    public String decodeMessageType(Message msg) {
-      int type = msg.getType();
+      if (!(msg instanceof ICoreMessage)) {
+         return "N/A";
+      }
+      int type = ((ICoreMessage) msg).getType();
       switch (type) {
          case ActiveMQMessage.TYPE: // 0
             return "Default";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 772f2cd..456d281 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -27,13 +27,12 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.encode.BodyType;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -60,7 +59,6 @@ public class AMQPMessage extends RefCountMessage {
    String address;
    MessageImpl protonMessage;
    private volatile int memoryEstimate = -1;
-   private ProtonProtocolManager protocolManager;
    private long expiration = 0;
    // this can be used to encode the header again and the rest of the message buffer
    private int headerEnd = -1;
@@ -71,8 +69,7 @@ public class AMQPMessage extends RefCountMessage {
    private Properties _properties;
    private ApplicationProperties applicationProperties;
 
-   public AMQPMessage(long messageFormat, byte[] data, ProtonProtocolManager protocolManager) {
-      this.protocolManager = protocolManager;
+   public AMQPMessage(long messageFormat, byte[] data) {
       this.data = Unpooled.wrappedBuffer(data);
       this.messageFormat = messageFormat;
       this.bufferValid = true;
@@ -86,15 +83,14 @@ public class AMQPMessage extends RefCountMessage {
 
    }
 
-   public AMQPMessage(long messageFormat, Message message, ProtonProtocolManager protocolManager) {
-      this.protocolManager = protocolManager;
-      this.protonMessage = (MessageImpl)message;
+   public AMQPMessage(long messageFormat, Message message) {
       this.messageFormat = messageFormat;
+      this.protonMessage = (MessageImpl)message;
 
    }
 
-   public AMQPMessage(Message message, ProtonProtocolManager protocolManager) {
-      this(0, message, protocolManager);
+   public AMQPMessage(Message message) {
+      this(0, message);
    }
 
    public MessageImpl getProtonMessage() {
@@ -292,40 +288,6 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public ActiveMQBuffer getBodyBuffer() {
-      // NO-IMPL
-      return null;
-   }
-
-   @Override
-   public ActiveMQBuffer getReadOnlyBodyBuffer() {
-      // NO-IMPL
-      return null;
-   }
-
-   @Override
-   public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
-      // NO-IMPL
-      return null;
-   }
-
-   @Override
-   public byte getType() {
-      return type;
-   }
-
-   @Override
-   public AMQPMessage setType(byte type) {
-      this.type = type;
-      return this;
-   }
-
-   @Override
-   public boolean isLargeMessage() {
-      return false;
-   }
-
-   @Override
    public ByteBuf getBuffer() {
       if (data == null) {
          return null;
@@ -342,12 +304,14 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message copy() {
-      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager);
+      checkBuffer();
+      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array());
       return newEncode;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message copy(long newID) {
+      checkBuffer();
       return copy().setMessageID(newID);
    }
 
@@ -403,32 +367,6 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public Object getProtocol() {
-      return protocolManager;
-   }
-
-   @Override
-   public AMQPMessage setProtocol(Object protocol) {
-      this.protocolManager = (ProtonProtocolManager)protocol;
-      return this;
-   }
-
-   @Override
-   public Object getBody() {
-      return null;
-   }
-
-   @Override
-   public BodyType getBodyType() {
-      return null;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message setBody(BodyType type, Object body) {
-      return null;
-   }
-
-   @Override
    public String getAddress() {
       if (address == null) {
          Properties properties = getProtonMessage().getProperties();
@@ -794,9 +732,12 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message toCore() {
-      MessageImpl protonMessage = getProtonMessage();
-      throw new IllegalStateException("conversion between AMQP and Core not implemented yet!");
+   public ICoreMessage toCore() {
+      try {
+         return AMQPConverter.getInstance().toCore(this);
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f34298c..5931afe 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -33,14 +34,13 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -64,7 +64,6 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Receiver;
@@ -296,13 +295,6 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public long encodeMessage(Message message, int deliveryCount, WritableBuffer buffer) throws Exception {
-      ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter();
-
-      // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode.
-      return (long) converter.outbound(message, deliveryCount, buffer);
-   }
-
    public String tempQueueName() {
       return UUIDGenerator.getInstance().generateStringUUID();
    }
@@ -350,7 +342,7 @@ public class AMQPSessionCallback implements SessionCallback {
                           String address,
                           int messageFormat,
                           byte[] data) throws Exception {
-      AMQPMessage message = new AMQPMessage(messageFormat, data, manager);
+      AMQPMessage message = new AMQPMessage(messageFormat, data);
       if (address != null) {
          message.setAddress(new SimpleString(address));
       } else {
@@ -494,7 +486,7 @@ public class AMQPSessionCallback implements SessionCallback {
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
 
       try {
-         return plugSender.deliverMessage(message, deliveryCount);
+         return plugSender.deliverMessage(CoreAmqpConverter.checkAMQP(message), deliveryCount);
       } catch (Exception e) {
          synchronized (connection.getLock()) {
             plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 754172a..9c7d24d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -26,19 +26,17 @@ import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -54,8 +52,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
 
    private final ActiveMQServer server;
 
-   private MessageConverter protonConverter;
-
    private final ProtonProtocolManagerFactory factory;
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
@@ -72,7 +68,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
-      this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
    }
 
    public ActiveMQServer getServer() {
@@ -80,11 +75,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    }
 
    @Override
-   public MessageConverter getConverter() {
-      return protonConverter;
-   }
-
-   @Override
    public void onNotification(Notification notification) {
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
new file mode 100644
index 0000000..724474b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+
+
+public class AMQPConverter implements MessageConverter<AMQPMessage> {
+
+   private static final AMQPConverter theInstance = new AMQPConverter();
+
+   private AMQPConverter() {
+   }
+
+   public static AMQPConverter getInstance() {
+      return theInstance;
+   }
+
+   @Override
+   public AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception {
+      return CoreAmqpConverter.fromCore(coreMessage);
+   }
+
+   @Override
+   public ICoreMessage toCore(AMQPMessage messageSource) throws Exception {
+      return AmqpCoreConverter.toCore(messageSource);
+   }
+}


Mime
View raw message