activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [21/21] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.
Date Mon, 06 Mar 2017 03:58:39 GMT
ARTEMIS-1009 Pure Message Encoding.

with this we could send and receive message in their raw format,
without requiring conversions to Core.

- MessageImpl and ServerMessage are removed as part of this
- AMQPMessage and CoreMessage will have the specialized message format for each protocol
- The protocol manager is now responsible to send the message
- The message will provide an encoder for journal and paging


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/669e7cf2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/669e7cf2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/669e7cf2

Branch: refs/heads/artemis-1009
Commit: 669e7cf2a5d460bdda1d8522b129d37ae4517288
Parents: c1fa5d0
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Feb 20 15:55:15 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Sun Mar 5 22:58:02 2017 -0500

----------------------------------------------------------------------
 .../artemis/cli/commands/tools/PrintData.java   |    7 +
 .../cli/commands/tools/XmlDataExporter.java     |   42 +-
 .../cli/commands/tools/XmlDataExporterUtil.java |   11 +-
 .../cli/commands/tools/XmlDataImporter.java     |   10 +-
 .../org/apache/activemq/artemis/Closeable.java  |   22 +
 .../artemis/api/core/ActiveMQBuffer.java        |   13 +
 .../artemis/api/core/ActiveMQBuffers.java       |   15 +
 .../activemq/artemis/api/core/SimpleString.java |   34 +
 .../core/buffers/impl/ChannelBufferWrapper.java |   84 +-
 .../artemis/core/persistence/Persister.java     |   30 +
 .../apache/activemq/artemis/utils/ByteUtil.java |    8 +
 .../activemq/artemis/utils/TypedProperties.java |   62 +-
 .../apache/activemq/artemis/utils/UTF8Util.java |   36 +-
 .../artemis/utils/TypedPropertiesTest.java      |   10 +-
 .../config/ActiveMQDefaultConfiguration.java    |   20 -
 .../activemq/artemis/api/core/ICoreMessage.java |   90 ++
 .../activemq/artemis/api/core/Message.java      |  637 +++++-----
 .../artemis/api/core/RefCountMessage.java       |   81 ++
 .../api/core/RefCountMessageListener.java       |   31 +
 .../artemis/api/core/client/ClientMessage.java  |   69 +-
 .../artemis/api/core/encode/BodyType.java       |   22 +
 .../artemis/api/core/encode/MessageBody.java    |   28 +
 .../api/core/management/ManagementHelper.java   |   18 +-
 .../impl/ResetLimitWrappedActiveMQBuffer.java   |   24 +-
 .../core/client/impl/ClientConsumerImpl.java    |    4 +-
 .../client/impl/ClientLargeMessageImpl.java     |   22 +-
 .../core/client/impl/ClientMessageImpl.java     |   69 +-
 .../core/client/impl/ClientMessageInternal.java |    4 +-
 .../core/client/impl/ClientProducerImpl.java    |   50 +-
 .../CompressedLargeMessageControllerImpl.java   |    6 +
 .../client/impl/LargeMessageControllerImpl.java |   15 +
 .../artemis/core/message/BodyEncoder.java       |   55 -
 .../artemis/core/message/LargeBodyEncoder.java  |   55 +
 .../artemis/core/message/impl/CoreMessage.java  | 1096 ++++++++++++++++++
 .../core/message/impl/CoreMessagePersister.java |   66 ++
 .../artemis/core/message/impl/MessageImpl.java  | 1059 -----------------
 .../core/message/impl/MessageInternal.java      |   57 -
 .../core/impl/ActiveMQSessionContext.java       |   17 +-
 .../core/protocol/core/impl/ChannelImpl.java    |    1 +
 .../core/protocol/core/impl/PacketImpl.java     |   30 +-
 .../core/impl/RemotingConnectionImpl.java       |    1 +
 .../core/impl/wireformat/MessagePacket.java     |   21 +-
 .../SessionReceiveClientLargeMessage.java       |    5 +-
 .../wireformat/SessionReceiveLargeMessage.java  |   14 +-
 .../impl/wireformat/SessionReceiveMessage.java  |   60 +-
 .../SessionSendContinuationMessage.java         |    8 +-
 .../wireformat/SessionSendLargeMessage.java     |   12 +-
 .../impl/wireformat/SessionSendMessage.java     |   54 +-
 .../activemq/artemis/reader/MapMessageUtil.java |    4 +-
 .../spi/core/remoting/SessionContext.java       |   14 +-
 .../artemis/message/CoreMessageTest.java        |  365 ++++++
 .../jdbc/store/journal/JDBCJournalImpl.java     |   36 +-
 .../jdbc/store/journal/JDBCJournalRecord.java   |    7 +-
 .../api/jms/management/JMSManagementHelper.java |    3 +-
 .../jms/client/ActiveMQBytesMessage.java        |    4 +-
 .../artemis/jms/client/ActiveMQMessage.java     |    8 +-
 .../jms/transaction/JMSTransactionDetail.java   |   12 +-
 .../artemis/core/journal/EncoderPersister.java  |   51 +
 .../activemq/artemis/core/journal/Journal.java  |   55 +-
 .../journal/impl/AbstractJournalUpdateTask.java |    3 +-
 .../core/journal/impl/FileWrapperJournal.java   |   26 +-
 .../artemis/core/journal/impl/JournalBase.java  |   63 +-
 .../core/journal/impl/JournalCompactor.java     |    9 +-
 .../artemis/core/journal/impl/JournalImpl.java  |   62 +-
 .../impl/dataformat/JournalAddRecord.java       |   20 +-
 .../impl/dataformat/JournalAddRecordTX.java     |   17 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  872 ++++++++++++++
 .../amqp/broker/AMQPMessagePersister.java       |   75 ++
 .../amqp/broker/AMQPSessionCallback.java        |   45 +-
 .../amqp/broker/ProtonProtocolManager.java      |   12 +-
 .../broker/ProtonProtocolManagerFactory.java    |   14 +
 .../amqp/converter/AMQPContentTypeSupport.java  |  146 +++
 .../protocol/amqp/converter/AMQPConverter.java  |   44 +
 .../amqp/converter/AMQPMessageIdHelper.java     |  252 ++++
 .../amqp/converter/AMQPMessageSupport.java      |  308 +++++
 .../amqp/converter/AmqpCoreConverter.java       |  351 ++++++
 .../amqp/converter/CoreAmqpConverter.java       |  461 ++++++++
 .../amqp/converter/ProtonMessageConverter.java  |  101 --
 .../converter/jms/ServerJMSBytesMessage.java    |   10 +-
 .../amqp/converter/jms/ServerJMSMapMessage.java |    6 +-
 .../amqp/converter/jms/ServerJMSMessage.java    |   71 +-
 .../converter/jms/ServerJMSObjectMessage.java   |    9 +-
 .../converter/jms/ServerJMSStreamMessage.java   |    8 +-
 .../converter/jms/ServerJMSTextMessage.java     |    6 +-
 .../message/AMQPContentTypeSupport.java         |  146 ---
 .../converter/message/AMQPMessageIdHelper.java  |  252 ----
 .../converter/message/AMQPMessageSupport.java   |  276 -----
 .../converter/message/AMQPMessageTypes.java     |   30 -
 .../message/AMQPNativeInboundTransformer.java   |   44 -
 .../message/AMQPNativeOutboundTransformer.java  |   80 --
 .../message/AMQPRawInboundTransformer.java      |   62 -
 .../amqp/converter/message/EncodedMessage.java  |   67 --
 .../converter/message/InboundTransformer.java   |  243 ----
 .../message/JMSMappingInboundTransformer.java   |  196 ----
 .../message/JMSMappingOutboundTransformer.java  |  592 ----------
 .../converter/message/OutboundTransformer.java  |   53 -
 .../amqp/proton/AMQPConnectionContext.java      |    4 +
 .../proton/ProtonServerReceiverContext.java     |   39 +-
 .../amqp/proton/ProtonServerSenderContext.java  |   30 +-
 .../amqp/proton/ProtonTransactionHandler.java   |    3 +-
 .../amqp/proton/handler/ProtonHandler.java      |    2 +-
 .../protocol/amqp/util/NettyReadable.java       |  139 +++
 .../artemis/protocol/amqp/util/TLSEncode.java   |   52 +
 .../amqp/converter/TestConversions.java         |  619 +---------
 .../message/AMQPContentTypeSupportTest.java     |   10 +-
 .../message/AMQPMessageIdHelperTest.java        |   11 +-
 .../message/AMQPMessageSupportTest.java         |   11 +-
 .../JMSMappingInboundTransformerTest.java       |  234 +---
 .../JMSMappingOutboundTransformerTest.java      |  387 +------
 .../JMSTransformationSpeedComparisonTest.java   |   94 +-
 .../message/MessageTransformationTest.java      |  150 +--
 .../protocol/amqp/message/AMQPMessageTest.java  |   63 +
 .../core/protocol/mqtt/MQTTProtocolManager.java |    6 -
 .../core/protocol/mqtt/MQTTPublishManager.java  |   31 +-
 .../protocol/mqtt/MQTTRetainMessageManager.java |    8 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java |   12 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |   20 +-
 .../protocol/openwire/OpenWireConnection.java   |    3 -
 .../openwire/OpenWireMessageConverter.java      |   34 +-
 .../openwire/OpenWireProtocolManager.java       |    9 +-
 .../core/protocol/openwire/OpenwireMessage.java |  499 ++++++++
 .../core/protocol/openwire/amq/AMQConsumer.java |    9 +-
 .../core/protocol/openwire/amq/AMQSession.java  |   14 +-
 .../protocol/openwire/util/OpenWireUtil.java    |   12 +-
 .../ActiveMQStompProtocolMessageBundle.java     |    3 +-
 .../core/protocol/stomp/StompConnection.java    |   17 +-
 .../protocol/stomp/StompProtocolManager.java    |   14 +-
 .../core/protocol/stomp/StompSession.java       |   51 +-
 .../artemis/core/protocol/stomp/StompUtils.java |    6 +-
 .../stomp/VersionedStompFrameHandler.java       |   23 +-
 .../stomp/v12/StompFrameHandlerV12.java         |    8 +-
 .../artemis/core/config/Configuration.java      |    8 -
 .../core/config/impl/ConfigurationImpl.java     |   32 -
 .../deployers/impl/FileConfigurationParser.java |    4 -
 .../activemq/artemis/core/filter/Filter.java    |    4 +-
 .../artemis/core/filter/impl/FilterImpl.java    |   19 +-
 .../management/impl/AddressControlImpl.java     |    6 +-
 .../core/management/impl/QueueControlImpl.java  |   10 +-
 .../impl/openmbean/OpenTypeSupport.java         |   21 +-
 .../artemis/core/paging/PagedMessage.java       |    4 +-
 .../artemis/core/paging/PagingStore.java        |    7 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |   16 +-
 .../cursor/impl/PageSubscriptionImpl.java       |    4 +-
 .../activemq/artemis/core/paging/impl/Page.java |    5 +-
 .../core/paging/impl/PagedMessageImpl.java      |   70 +-
 .../core/paging/impl/PagingStoreImpl.java       |   46 +-
 .../core/persistence/StorageManager.java        |   16 +-
 .../journal/AbstractJournalStorageManager.java  |   60 +-
 .../impl/journal/AddMessageRecord.java          |    8 +-
 .../impl/journal/DescribeJournal.java           |   17 +-
 .../impl/journal/JournalRecordIds.java          |    3 +
 .../impl/journal/JournalStorageManager.java     |   14 +-
 .../journal/LargeMessageTXFailureCallback.java  |    6 +-
 .../impl/journal/LargeServerMessageImpl.java    |  108 +-
 .../journal/codec/LargeMessageEncoding.java     |   55 -
 .../journal/codec/LargeMessagePersister.java    |   62 +
 .../nullpm/NullStorageLargeServerMessage.java   |   16 +-
 .../impl/nullpm/NullStorageManager.java         |   15 +-
 .../artemis/core/postoffice/Binding.java        |    9 +-
 .../artemis/core/postoffice/Bindings.java       |    6 +-
 .../artemis/core/postoffice/PostOffice.java     |   18 +-
 .../core/postoffice/impl/BindingsImpl.java      |   26 +-
 .../core/postoffice/impl/DivertBinding.java     |    8 +-
 .../core/postoffice/impl/LocalQueueBinding.java |    8 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |  121 +-
 .../core/protocol/ServerPacketDecoder.java      |    6 +-
 .../core/ServerSessionPacketHandler.java        |   83 +-
 .../core/impl/ActiveMQPacketHandler.java        |    2 +-
 .../protocol/core/impl/CoreProtocolManager.java |   13 +-
 .../core/impl/CoreProtocolManagerFactory.java   |   14 +
 .../protocol/core/impl/CoreSessionCallback.java |    9 +-
 .../impl/wireformat/ReplicationAddMessage.java  |   14 +-
 .../wireformat/ReplicationAddTXMessage.java     |   14 +-
 .../wireformat/ReplicationPageWriteMessage.java |    2 +-
 .../core/remoting/server/RemotingService.java   |    4 +
 .../server/impl/RemotingServiceImpl.java        |   11 +-
 .../core/replication/ReplicatedJournal.java     |   52 +-
 .../core/replication/ReplicationEndpoint.java   |    7 +-
 .../core/replication/ReplicationManager.java    |   11 +-
 .../core/server/ActiveMQServerLogger.java       |    8 +-
 .../activemq/artemis/core/server/Bindable.java  |    6 +-
 .../artemis/core/server/LargeServerMessage.java |    3 +-
 .../artemis/core/server/MessageReference.java   |   10 +-
 .../activemq/artemis/core/server/Queue.java     |    3 +-
 .../artemis/core/server/ServerMessage.java      |   78 --
 .../artemis/core/server/ServerSession.java      |   23 +-
 .../core/server/cluster/Transformer.java        |    4 +-
 .../core/server/cluster/impl/BridgeImpl.java    |   14 +-
 .../cluster/impl/ClusterConnectionBridge.java   |   13 +-
 .../core/server/cluster/impl/Redistributor.java |    3 +-
 .../cluster/impl/RemoteQueueBindingImpl.java    |   13 +-
 .../core/server/impl/ActiveMQServerImpl.java    |    2 +-
 .../artemis/core/server/impl/DivertImpl.java    |    9 +-
 .../artemis/core/server/impl/JournalLoader.java |    6 +-
 .../core/server/impl/LastValueQueue.java        |    9 +-
 .../core/server/impl/MessageReferenceImpl.java  |   24 +-
 .../server/impl/PostOfficeJournalLoader.java    |    7 +-
 .../artemis/core/server/impl/QueueImpl.java     |   60 +-
 .../artemis/core/server/impl/RefsOperation.java |    4 +-
 .../core/server/impl/ScaleDownHandler.java      |   37 +-
 .../core/server/impl/ServerConsumerImpl.java    |   30 +-
 .../core/server/impl/ServerMessageImpl.java     |  341 ------
 .../core/server/impl/ServerSessionImpl.java     |  156 +--
 .../server/management/ManagementService.java    |    7 +-
 .../management/impl/ManagementServiceImpl.java  |   15 +-
 .../core/transaction/TransactionDetail.java     |    8 +-
 .../transaction/impl/CoreTransactionDetail.java |   11 +-
 .../spi/core/protocol/MessageConverter.java     |   10 +-
 .../spi/core/protocol/MessagePersister.java     |   88 ++
 .../spi/core/protocol/ProtocolManager.java      |   12 +-
 .../core/protocol/ProtocolManagerFactory.java   |   15 +
 .../spi/core/protocol/SessionCallback.java      |    6 +-
 .../resources/schema/artemis-configuration.xsd  |   16 -
 .../core/config/impl/ConfigurationImplTest.java |    9 -
 .../artemis/core/filter/impl/FilterTest.java    |   12 +-
 .../group/impl/ClusteredResetMockTest.java      |    7 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  200 ++--
 .../transaction/impl/TransactionImplTest.java   |   16 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |   14 +-
 .../resources/ConfigurationTest-full-config.xml |    2 -
 .../test/resources/artemis-configuration.xsd    |   16 -
 .../jms/example/HatColourChangeTransformer.java |    4 +-
 .../example/AddForwardingTimeTransformer.java   |    7 +-
 pom.xml                                         |    5 +-
 .../PartialPooledByteBufAllocator.java          |    5 +
 .../amqp/client/util/UnmodifiableDelivery.java  |    6 +
 .../journal/gcfree/EncodersBench.java           |    5 +-
 .../byteman/JMSBridgeReconnectionTest.java      |    4 +-
 .../tests/extras/byteman/MessageCopyTest.java   |  163 ---
 .../integration/DuplicateDetectionTest.java     |    6 +-
 .../amqp/AmqpDescribedTypePayloadTest.java      |    6 +-
 .../integration/amqp/AmqpSendReceiveTest.java   |   21 -
 .../tests/integration/amqp/ProtonTest.java      |   73 +-
 .../integration/client/AckBatchSizeTest.java    |   14 +-
 .../integration/client/AcknowledgeTest.java     |  172 ++-
 .../tests/integration/client/ConsumerTest.java  |  265 ++++-
 .../integration/client/HangConsumerTest.java    |    7 +-
 .../InVMNonPersistentMessageBufferTest.java     |   36 +-
 .../client/InterruptedLargeMessageTest.java     |   10 +-
 .../integration/client/LargeMessageTest.java    |    5 +-
 .../integration/clientcrash/ClientExitTest.java |    4 +-
 .../integration/cluster/bridge/BridgeTest.java  |   10 +-
 .../cluster/bridge/SimpleTransformer.java       |    6 +-
 .../distribution/ClusterHeadersRemovedTest.java |    5 +-
 .../distribution/MessageRedistributionTest.java |    4 +-
 .../tests/integration/divert/DivertTest.java    |    5 +-
 .../interceptors/InterceptorTest.java           |    8 +-
 .../integration/journal/MessageJournalTest.java |  130 +++
 .../journal/NIOJournalCompactTest.java          |    6 +-
 .../integration/karaf/ArtemisFeatureTest.java   |    2 +
 .../management/ManagementHelperTest.java        |    8 +-
 .../management/ManagementServiceImplTest.java   |   25 +-
 .../integration/paging/PagingSendTest.java      |    3 +-
 .../tests/integration/paging/PagingTest.java    |    4 +-
 .../DeleteMessagesOnStartupTest.java            |   10 +-
 .../persistence/ExportFormatTest.java           |   28 +-
 .../replication/ReplicationTest.java            |   71 +-
 .../integration/server/FakeStorageManager.java  |    6 +-
 .../tests/integration/server/ScaleDownTest.java |    4 +-
 .../ssl/CoreClientOverOneWaySSLTest.java        |    4 +-
 .../ssl/CoreClientOverTwoWaySSLTest.java        |    5 +-
 .../storage/PersistMultiThreadTest.java         |   30 +-
 .../stress/paging/PageCursorStressTest.java     |   24 +-
 .../core/server/impl/QueueConcurrentTest.java   |    6 +-
 tests/unit-tests/pom.xml                        |    6 +
 .../core/journal/impl/JournalImplTestUnit.java  |    2 +-
 .../unit/core/message/impl/MessageImplTest.java |   14 +-
 .../tests/unit/core/paging/impl/PageTest.java   |   42 +-
 .../core/paging/impl/PagingManagerImplTest.java |   16 +-
 .../core/paging/impl/PagingStoreImplTest.java   |   64 +-
 .../core/postoffice/impl/BindingsImplTest.java  |   16 +-
 .../unit/core/postoffice/impl/FakeQueue.java    |    9 +-
 .../impl/WildcardAddressManagerUnitTest.java    |   12 +-
 .../unit/core/server/impl/QueueImplTest.java    |    4 +-
 .../unit/core/server/impl/fakes/FakeFilter.java |    7 +-
 .../server/impl/fakes/FakeJournalLoader.java    |    6 +-
 .../core/server/impl/fakes/FakePostOffice.java  |   22 +-
 .../tests/unit/util/FakePagingManager.java      |    7 +-
 .../artemis/tests/unit/util/MemorySizeTest.java |    4 +-
 .../artemis/tests/unit/util/UTF8Test.java       |   10 +-
 280 files changed, 8530 insertions(+), 7737 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 408aef5..2816aaf 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.cli.Artemis;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -50,16 +51,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
 public class PrintData extends OptionalLocking {
 
+   static {
+      MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
+   }
+
    @Override
    public Object execute(ActionContext context) throws Exception {
       super.execute(context);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index 4f99181..d2f6204 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.cli.commands.tools;
 
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
 import java.io.File;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationHandler;
@@ -33,14 +36,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
-import javax.xml.stream.XMLOutputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-
+import io.airlift.airline.Command;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -50,7 +52,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -74,8 +76,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
@@ -83,8 +83,6 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 
-import io.airlift.airline.Command;
-
 @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
 public final class XmlDataExporter extends OptionalLocking {
 
@@ -220,7 +218,9 @@ public final class XmlDataExporter extends OptionalLocking {
 
          Object o = DescribeJournal.newObjectEncoding(info, storageManager);
          if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) {
-            messages.put(info.id, ((MessageDescribe) o).getMsg());
+            messages.put(info.id, ((MessageDescribe) o).getMsg().toCore());
+         } else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+            messages.put(info.id, ((MessageDescribe) o).getMsg().toCore());
          } else if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) {
             messages.put(info.id, ((MessageDescribe) o).getMsg());
          } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) {
@@ -361,13 +361,13 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter.writeEndElement(); // end BINDINGS_PARENT
    }
 
-   private void printAllMessagesAsXML() throws XMLStreamException {
+   private void printAllMessagesAsXML() throws Exception {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
 
       // Order here is important.  We must process the messages from the journal before we process those from the page
       // files in order to get the messages in the right order.
       for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
-         printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+         printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
       }
 
       printPagedMessagesAsXML();
@@ -439,7 +439,7 @@ public final class XmlDataExporter extends OptionalLocking {
                      }
 
                      if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) {
-                        printSingleMessageAsXML(message.getMessage(), queueNames);
+                        printSingleMessageAsXML(message.getMessage().toCore(), queueNames);
                      }
 
                      messageId++;
@@ -456,20 +456,20 @@ public final class XmlDataExporter extends OptionalLocking {
       }
    }
 
-   private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException {
+   private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
       printMessageAttributes(message);
       printMessageProperties(message);
       printMessageQueues(queues);
-      printMessageBody(message);
+      printMessageBody(message.toCore());
       xmlWriter.writeEndElement(); // end MESSAGES_CHILD
       messagesPrinted++;
    }
 
-   private void printMessageBody(ServerMessage message) throws XMLStreamException {
+   private void printMessageBody(Message message) throws Exception {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
 
-      if (message.isLargeMessage()) {
+      if (message.toCore().isLargeMessage()) {
          printLargeMessageBody((LargeServerMessage) message);
       } else {
          xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message));
@@ -479,10 +479,10 @@ public final class XmlDataExporter extends OptionalLocking {
 
    private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
-      BodyEncoder encoder = null;
+      LargeBodyEncoder encoder = null;
 
       try {
-         encoder = message.getBodyEncoder();
+         encoder = message.toCore().getBodyEncoder();
          encoder.open();
          long totalBytesWritten = 0;
          Long bufferSize;
@@ -522,7 +522,7 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter.writeEndElement(); // end QUEUES_PARENT
    }
 
-   private void printMessageProperties(ServerMessage message) throws XMLStreamException {
+   private void printMessageProperties(Message message) throws XMLStreamException {
       xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
       for (SimpleString key : message.getPropertyNames()) {
          Object value = message.getObjectProperty(key);
@@ -539,7 +539,7 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
    }
 
-   private void printMessageAttributes(ServerMessage message) throws XMLStreamException {
+   private void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
index 8ee7678..7711648 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
@@ -17,10 +17,9 @@
 package org.apache.activemq.artemis.cli.commands.tools;
 
 import com.google.common.base.Preconditions;
-
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.utils.Base64;
 
 /**
@@ -92,12 +91,12 @@ public class XmlDataExporterUtil {
     * @param message
     * @return
     */
-   public static String encodeMessageBody(final ServerMessage message) {
+   public static String encodeMessageBody(final Message message) throws Exception {
       Preconditions.checkNotNull(message, "ServerMessage can not be null");
 
-      int size = message.getEndOfBodyPosition() - message.getBodyBuffer().readerIndex();
-      byte[] buffer = new byte[size];
-      message.getBodyBuffer().readBytes(buffer);
+      ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer();
+      byte[] buffer = new byte[byteBuffer.writerIndex()];
+      byteBuffer.readBytes(buffer);
 
       return XmlDataExporterUtil.encode(buffer);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
index 8e2bb9f..518d231 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
@@ -45,7 +45,9 @@ import java.util.UUID;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -59,11 +61,9 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.cli.commands.ActionAbstract;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ClassloadingUtil;
 import org.apache.activemq.artemis.utils.ListUtil;
@@ -298,7 +298,7 @@ public final class XmlDataImporter extends ActionAbstract {
          switch (eventType) {
             case XMLStreamConstants.START_ELEMENT:
                if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
-                  processMessageBody(message);
+                  processMessageBody(message.toCore());
                } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
                   processMessageProperties(message);
                } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
@@ -387,7 +387,7 @@ public final class XmlDataImporter extends ActionAbstract {
          logger.debug(logMessage);
       }
 
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+      message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
       try (ClientProducer producer = session.createProducer(destination)) {
          producer.send(message);
       }
@@ -469,7 +469,7 @@ public final class XmlDataImporter extends ActionAbstract {
       }
    }
 
-   private void processMessageBody(final Message message) throws XMLStreamException, IOException {
+   private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
       boolean isLarge = false;
 
       for (int i = 0; i < reader.getAttributeCount(); i++) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
new file mode 100644
index 0000000..2f00c5d
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis;
+
+public interface Closeable {
+   void close(boolean failed);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index 5446f3f..3a208a6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput {
     */
    void writeBytes(ByteBuffer src);
 
+
+   /**
+    * Transfers the specified source buffer's data to this buffer starting at
+    * the current {@code writerIndex} until the source buffer's position
+    * reaches its limit, and increases the {@code writerIndex} by the
+    * number of the transferred bytes.
+    *
+    * @param src The source buffer
+    * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+    *                                   {@code this.writableBytes}
+    */
+   void writeBytes(ByteBuf src, int srcIndex, int length);
+
    /**
     * Returns a copy of this buffer's readable bytes.  Modifying the content
     * of the returned buffer or this buffer does not affect each other at all.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
index 32f9279..25fcfea 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
 
 import java.nio.ByteBuffer;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -76,6 +77,20 @@ public final class ActiveMQBuffers {
    }
 
    /**
+    * Creates an ActiveMQBuffer wrapping an underlying ByteBuf
+    *
+    * The position on this buffer won't affect the position on the inner buffer
+    *
+    * @param underlying the underlying NIO ByteBuffer
+    * @return an ActiveMQBuffer wrapping the underlying NIO ByteBuffer
+    */
+   public static ActiveMQBuffer wrappedBuffer(final ByteBuf underlying) {
+      ActiveMQBuffer buff = new ChannelBufferWrapper(underlying.duplicate());
+
+      return buff;
+   }
+
+   /**
     * Creates an ActiveMQBuffer wrapping an underlying byte array
     *
     * @param underlying the underlying byte array

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index b7f70c6..e8530e6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -134,6 +135,39 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
    }
 
 
+   public static SimpleString readNullableSimpleString(ByteBuf buffer) {
+      int b = buffer.readByte();
+      if (b == DataConstants.NULL) {
+         return null;
+      }
+      return readSimpleString(buffer);
+   }
+
+
+   public static SimpleString readSimpleString(ByteBuf buffer) {
+      int len = buffer.readInt();
+      byte[] data = new byte[len];
+      buffer.readBytes(data);
+      return new SimpleString(data);
+   }
+
+   public static void writeNullableSimpleString(ByteBuf buffer, SimpleString val) {
+      if (val == null) {
+         buffer.writeByte(DataConstants.NULL);
+      } else {
+         buffer.writeByte(DataConstants.NOT_NULL);
+         writeSimpleString(buffer, val);
+      }
+   }
+
+   public static void writeSimpleString(ByteBuf buffer, SimpleString val) {
+      byte[] data = val.getData();
+      buffer.writeInt(data.length);
+      buffer.writeBytes(data);
+   }
+
+
+
    public SimpleString subSeq(final int start, final int end) {
       int len = data.length >> 1;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index 690dbd7..92314e2 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -66,11 +66,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    @Override
    public SimpleString readNullableSimpleString() {
-      int b = buffer.readByte();
-      if (b == DataConstants.NULL) {
-         return null;
-      }
-      return readSimpleStringInternal();
+      return SimpleString.readNullableSimpleString(buffer);
    }
 
    @Override
@@ -84,14 +80,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    @Override
    public SimpleString readSimpleString() {
-      return readSimpleStringInternal();
-   }
-
-   private SimpleString readSimpleStringInternal() {
-      int len = buffer.readInt();
-      byte[] data = new byte[len];
-      buffer.readBytes(data);
-      return new SimpleString(data);
+      return SimpleString.readSimpleString(buffer);
    }
 
    @Override
@@ -111,11 +100,22 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
       } else if (len < 0xfff) {
          return readUTF();
       } else {
-         return readSimpleStringInternal().toString();
+         return SimpleString.readSimpleString(buffer).toString();
+
       }
    }
 
    @Override
+   public void writeNullableString(String val) {
+      UTF8Util.writeNullableString(buffer, val);
+   }
+
+   @Override
+   public void writeUTF(String utf) {
+      UTF8Util.saveUTF(buffer, utf);
+   }
+
+   @Override
    public String readUTF() {
       return UTF8Util.readUTF(this);
    }
@@ -127,62 +127,17 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    @Override
    public void writeNullableSimpleString(final SimpleString val) {
-      if (val == null) {
-         buffer.writeByte(DataConstants.NULL);
-      } else {
-         buffer.writeByte(DataConstants.NOT_NULL);
-         writeSimpleStringInternal(val);
-      }
-   }
-
-   @Override
-   public void writeNullableString(final String val) {
-      if (val == null) {
-         buffer.writeByte(DataConstants.NULL);
-      } else {
-         buffer.writeByte(DataConstants.NOT_NULL);
-         writeStringInternal(val);
-      }
+      SimpleString.writeNullableSimpleString(buffer, val);
    }
 
    @Override
    public void writeSimpleString(final SimpleString val) {
-      writeSimpleStringInternal(val);
-   }
-
-   private void writeSimpleStringInternal(final SimpleString val) {
-      byte[] data = val.getData();
-      buffer.writeInt(data.length);
-      buffer.writeBytes(data);
+      SimpleString.writeSimpleString(buffer, val);
    }
 
    @Override
    public void writeString(final String val) {
-      writeStringInternal(val);
-   }
-
-   private void writeStringInternal(final String val) {
-      int length = val.length();
-
-      buffer.writeInt(length);
-
-      if (length < 9) {
-         // If very small it's more performant to store char by char
-         for (int i = 0; i < val.length(); i++) {
-            buffer.writeShort((short) val.charAt(i));
-         }
-      } else if (length < 0xfff) {
-         // Store as UTF - this is quicker than char by char for most strings
-         writeUTF(val);
-      } else {
-         // Store as SimpleString, since can't store utf > 0xffff in length
-         writeSimpleStringInternal(new SimpleString(val));
-      }
-   }
-
-   @Override
-   public void writeUTF(final String utf) {
-      UTF8Util.saveUTF(this, utf);
+      UTF8Util.writeString(buffer, val);
    }
 
    @Override
@@ -576,6 +531,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
    }
 
    @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      buffer.writeBytes(src, srcIndex, length);
+   }
+
+   @Override
    public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
       buffer.writeBytes(src.byteBuf(), srcIndex, length);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
new file mode 100644
index 0000000..fd68a77
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+public interface Persister<T extends Object> {
+
+   int getEncodeSize(T record);
+
+   void encode(ActiveMQBuffer buffer, T record);
+
+   T decode(ActiveMQBuffer buffer, T record);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index bee8790..e70891d 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -101,6 +101,14 @@ public class ByteUtil {
    }
 
    public static String bytesToHex(byte[] bytes, int groupSize) {
+      if (bytes == null) {
+         return "NULL";
+      }
+
+      if (bytes.length == 0) {
+         return "[]";
+      }
+
       char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
       int outPos = 0;
       for (int j = 0; j < bytes.length; j++) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
index 56cec48..fda135b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@@ -47,7 +47,6 @@ import static org.apache.activemq.artemis.utils.DataConstants.STRING;
  * This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification
  * (Version 1.1 April 12, 2002).
  * <p>
- * TODO - should have typed property getters and do conversions herein
  */
 public final class TypedProperties {
 
@@ -62,6 +61,13 @@ public final class TypedProperties {
    public TypedProperties() {
    }
 
+   /**
+    *  Return the number of properites
+    * */
+   public int size() {
+      return properties.size();
+   }
+
    public int getMemoryOffset() {
       // The estimate is basically the encode size + 2 object references for each entry in the map
       // Note we don't include the attributes or anything else since they already included in the memory estimate
@@ -321,7 +327,7 @@ public final class TypedProperties {
       }
    }
 
-   public synchronized void decode(final ActiveMQBuffer buffer) {
+   public synchronized void decode(final ByteBuf buffer) {
       byte b = buffer.readByte();
 
       if (b == DataConstants.NULL) {
@@ -406,7 +412,7 @@ public final class TypedProperties {
       }
    }
 
-   public synchronized void encode(final ActiveMQBuffer buffer) {
+   public synchronized void encode(final ByteBuf buffer) {
       if (properties == null) {
          buffer.writeByte(DataConstants.NULL);
       } else {
@@ -547,7 +553,7 @@ public final class TypedProperties {
 
       abstract Object getValue();
 
-      abstract void write(ActiveMQBuffer buffer);
+      abstract void write(ByteBuf buffer);
 
       abstract int encodeSize();
 
@@ -568,7 +574,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.NULL);
       }
 
@@ -587,7 +593,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private BooleanValue(final ActiveMQBuffer buffer) {
+      private BooleanValue(final ByteBuf buffer) {
          val = buffer.readBoolean();
       }
 
@@ -597,7 +603,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.BOOLEAN);
          buffer.writeBoolean(val);
       }
@@ -617,7 +623,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private ByteValue(final ActiveMQBuffer buffer) {
+      private ByteValue(final ByteBuf buffer) {
          val = buffer.readByte();
       }
 
@@ -627,7 +633,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.BYTE);
          buffer.writeByte(val);
       }
@@ -646,7 +652,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private BytesValue(final ActiveMQBuffer buffer) {
+      private BytesValue(final ByteBuf buffer) {
          int len = buffer.readInt();
          val = new byte[len];
          buffer.readBytes(val);
@@ -658,7 +664,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.BYTES);
          buffer.writeInt(val.length);
          buffer.writeBytes(val);
@@ -679,7 +685,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private ShortValue(final ActiveMQBuffer buffer) {
+      private ShortValue(final ByteBuf buffer) {
          val = buffer.readShort();
       }
 
@@ -689,7 +695,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.SHORT);
          buffer.writeShort(val);
       }
@@ -708,7 +714,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private IntValue(final ActiveMQBuffer buffer) {
+      private IntValue(final ByteBuf buffer) {
          val = buffer.readInt();
       }
 
@@ -718,7 +724,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.INT);
          buffer.writeInt(val);
       }
@@ -737,7 +743,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private LongValue(final ActiveMQBuffer buffer) {
+      private LongValue(final ByteBuf buffer) {
          val = buffer.readLong();
       }
 
@@ -747,7 +753,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.LONG);
          buffer.writeLong(val);
       }
@@ -766,7 +772,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private FloatValue(final ActiveMQBuffer buffer) {
+      private FloatValue(final ByteBuf buffer) {
          val = Float.intBitsToFloat(buffer.readInt());
       }
 
@@ -776,7 +782,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.FLOAT);
          buffer.writeInt(Float.floatToIntBits(val));
       }
@@ -796,7 +802,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private DoubleValue(final ActiveMQBuffer buffer) {
+      private DoubleValue(final ByteBuf buffer) {
          val = Double.longBitsToDouble(buffer.readLong());
       }
 
@@ -806,7 +812,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.DOUBLE);
          buffer.writeLong(Double.doubleToLongBits(val));
       }
@@ -825,7 +831,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private CharValue(final ActiveMQBuffer buffer) {
+      private CharValue(final ByteBuf buffer) {
          val = (char) buffer.readShort();
       }
 
@@ -835,7 +841,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.CHAR);
          buffer.writeShort((short) val);
       }
@@ -854,8 +860,8 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private StringValue(final ActiveMQBuffer buffer) {
-         val = buffer.readSimpleString();
+      private StringValue(final ByteBuf buffer) {
+         val = SimpleString.readSimpleString(buffer);
       }
 
       @Override
@@ -864,9 +870,9 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.STRING);
-         buffer.writeSimpleString(val);
+         SimpleString.writeSimpleString(buffer, val);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
index e75395b..84e1557 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
@@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils;
 
 import java.lang.ref.SoftReference;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
 import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
 
@@ -29,15 +31,43 @@ import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
  */
 public final class UTF8Util {
 
+
+   private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled();
+
+   private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>();
+
    private UTF8Util() {
       // utility class
    }
+   public static void writeNullableString(ByteBuf buffer, final String val) {
+      if (val == null) {
+         buffer.writeByte(DataConstants.NULL);
+      } else {
+         buffer.writeByte(DataConstants.NOT_NULL);
+         writeString(buffer, val);
+      }
+   }
 
-   private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled();
+   public static void writeString(final ByteBuf buffer, final String val) {
+      int length = val.length();
 
-   private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>();
+      buffer.writeInt(length);
+
+      if (length < 9) {
+         // If very small it's more performant to store char by char
+         for (int i = 0; i < val.length(); i++) {
+            buffer.writeShort((short) val.charAt(i));
+         }
+      } else if (length < 0xfff) {
+         // Store as UTF - this is quicker than char by char for most strings
+         saveUTF(buffer, val);
+      } else {
+         // Store as SimpleString, since can't store utf > 0xffff in length
+         SimpleString.writeSimpleString(buffer, new SimpleString(val));
+      }
+   }
 
-   public static void saveUTF(final ActiveMQBuffer out, final String str) {
+   public static void saveUTF(final ByteBuf out, final String str) {
       StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
 
       if (str.length() > 0xffff) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index 8013e96..cb6c8fe 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -187,12 +187,12 @@ public class TypedPropertiesTest {
       props.putSimpleStringProperty(keyToRemove, RandomUtil.randomSimpleString());
 
       ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
-      props.encode(buffer);
+      props.encode(buffer.byteBuf());
 
       Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
 
       TypedProperties decodedProps = new TypedProperties();
-      decodedProps.decode(buffer);
+      decodedProps.decode(buffer.byteBuf());
 
       TypedPropertiesTest.assertEqualsTypeProperties(props, decodedProps);
 
@@ -200,7 +200,7 @@ public class TypedPropertiesTest {
 
       // After removing a property, you should still be able to encode the Property
       props.removeProperty(keyToRemove);
-      props.encode(buffer);
+      props.encode(buffer.byteBuf());
 
       Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
    }
@@ -210,12 +210,12 @@ public class TypedPropertiesTest {
       TypedProperties emptyProps = new TypedProperties();
 
       ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
-      emptyProps.encode(buffer);
+      emptyProps.encode(buffer.byteBuf());
 
       Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
 
       TypedProperties decodedProps = new TypedProperties();
-      decodedProps.decode(buffer);
+      decodedProps.decode(buffer.byteBuf());
 
       TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 38ec105..c0d9db6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -262,12 +262,6 @@ public final class ActiveMQDefaultConfiguration {
    // The minimal number of data files before we can start compacting
    private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
 
-   // XXX Only meant to be used by project developers
-   private static int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1;
-
-   // XXX Only meant to be used by project developers
-   private static boolean DEFAULT_RUN_SYNC_SPEED_TEST = false;
-
    // Interval to log server specific information (e.g. memory usage etc)
    private static long DEFAULT_SERVER_DUMP_INTERVAL = -1;
 
@@ -801,20 +795,6 @@ public final class ActiveMQDefaultConfiguration {
    }
 
    /**
-    * XXX Only meant to be used by project developers
-    */
-   public static int getDefaultJournalPerfBlastPages() {
-      return DEFAULT_JOURNAL_PERF_BLAST_PAGES;
-   }
-
-   /**
-    * XXX Only meant to be used by project developers
-    */
-   public static boolean isDefaultRunSyncSpeedTest() {
-      return DEFAULT_RUN_SYNC_SPEED_TEST;
-   }
-
-   /**
     * Interval to log server specific information (e.g. memory usage etc)
     */
    public static long getDefaultServerDumpInterval() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
new file mode 100644
index 0000000..779470e
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+
+/**
+ * This interface is only to determine the API of methods required for Core Messages
+ */
+public interface ICoreMessage extends Message {
+
+   LargeBodyEncoder getBodyEncoder() throws ActiveMQException;
+
+   int getHeadersAndPropertiesEncodeSize();
+
+   @Override
+   InputStream getBodyInputStream();
+
+   /** Returns a new Buffer slicing the current Body. */
+   ActiveMQBuffer getReadOnlyBodyBuffer();
+
+   /** Return the type of the message */
+   @Override
+   byte getType();
+
+   /** the type of the message */
+   @Override
+   CoreMessage setType(byte type);
+
+   /**
+    * We are really interested if this is a LargeServerMessage.
+    * @return
+    */
+   boolean isServerMessage();
+
+   /**
+    * The body used for this message.
+    * @return
+    */
+   @Override
+   ActiveMQBuffer getBodyBuffer();
+
+   int getEndOfBodyPosition();
+
+
+   /** Used on large messages treatment */
+   void copyHeadersAndProperties(final Message msg);
+
+   /**
+    * @return Returns the message in Map form, useful when encoding to JSON
+    */
+   @Override
+   default Map<String, Object> toMap() {
+      Map map = toPropertyMap();
+      map.put("messageID", getMessageID());
+      Object userID = getUserID();
+      if (getUserID() != null) {
+         map.put("userID", "ID:" + userID.toString());
+      }
+
+      map.put("address", getAddress());
+      map.put("type", getType());
+      map.put("durable", isDurable());
+      map.put("expiration", getExpiration());
+      map.put("timestamp", getTimestamp());
+      map.put("priority", (int)getPriority());
+
+      return map;
+   }
+
+}


Mime
View raw message