activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1297 Load balance or redistribution of AMQP Messages
Date Tue, 29 Aug 2017 17:56:42 GMT
ARTEMIS-1297 Load balance or redistribution of AMQP Messages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6fda75a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6fda75a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6fda75a9

Branch: refs/heads/master
Commit: 6fda75a9fcb60f8f5630dbf558cee47d05418e30
Parents: b19637a
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Jul 27 17:25:27 2017 -0400
Committer: Justin Bertram <jbertram@apache.org>
Committed: Tue Aug 29 12:56:15 2017 -0500

----------------------------------------------------------------------
 .../artemis/cli/commands/tools/PrintData.java   |   3 +-
 .../artemis/core/persistence/Persister.java     |   8 +
 .../activemq/artemis/api/core/Message.java      |  16 ++
 .../core/message/impl/CoreMessagePersister.java |  10 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  53 +++++-
 .../amqp/broker/AMQPMessagePersister.java       |  13 +-
 .../amqp/broker/AMQPMessagePersisterV2.java     |  85 ++++++++++
 .../broker/ProtonProtocolManagerFactory.java    |  11 +-
 .../amqp/converter/AmqpCoreConverter.java       |   9 ++
 .../protocol/amqp/message/AMQPMessageTest.java  |  44 +++++
 .../core/postoffice/impl/BindingsImpl.java      |   4 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   2 +-
 .../core/ServerSessionPacketHandler.java        |   3 +-
 .../core/impl/CoreProtocolManagerFactory.java   |  10 +-
 .../core/server/cluster/impl/BridgeImpl.java    |   9 +-
 .../cluster/impl/ClusterConnectionBridge.java   |   4 +-
 .../cluster/impl/RemoteQueueBindingImpl.java    |   4 +-
 .../spi/core/protocol/EmbedMessageUtil.java     |  74 +++++++++
 .../spi/core/protocol/MessagePersister.java     |  41 +++--
 .../core/protocol/ProtocolManagerFactory.java   |  12 +-
 .../AMQPMessageLoadBalancingTest.java           | 160 +++++++++++++++++++
 .../tests/unit/core/paging/impl/PageTest.java   |   5 +-
 .../core/paging/impl/PagingStoreImplTest.java   |   6 +-
 23 files changed, 515 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 9a93c27..cf9c9c4 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
-import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -64,7 +63,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 public class PrintData extends OptionalLocking {
 
    static {
-      MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
+      MessagePersister.registerPersister(CoreMessagePersister.getInstance());
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
index fd68a77..124dfcf 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
@@ -21,6 +21,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 
 public interface Persister<T extends Object> {
 
+   /** This is to be used to store the protocol-id on Messages.
+    *  Messages are stored on their bare format.
+    *  The protocol manager will be responsible to code or decode messages.
+    *  The caveat here is that the first short-sized bytes need to be this constant. */
+   default byte getID() {
+      return (byte)0;
+   }
+
    int getEncodeSize(T record);
 
    void encode(ActiveMQBuffer buffer, T record);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 488fff0..58433ce 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -164,6 +164,9 @@ public interface Message {
 
    byte STREAM_TYPE = 6;
 
+   /** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/
+   byte EMBEDDED_TYPE = 7;
+
    default void cleanupInternalProperties() {
       // only on core
    }
@@ -438,6 +441,19 @@ public interface Message {
       }
    }
 
+
+   default org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
+      return putBytesProperty(key, value);
+   }
+
+   default byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return getBytesProperty(key);
+   }
+
+   default byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      return (byte[])removeProperty(key);
+   }
+
    default Object getDuplicateProperty() {
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
index ddf39d2..115b29b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
@@ -24,16 +24,24 @@ import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 public class CoreMessagePersister implements Persister<Message> {
+   public static final byte ID = 1;
 
-   public static CoreMessagePersister theInstance = new CoreMessagePersister();
+   public static CoreMessagePersister theInstance;
 
    public static CoreMessagePersister getInstance() {
+      if (theInstance == null) {
+         theInstance = new CoreMessagePersister();
+      }
       return theInstance;
    }
 
    protected CoreMessagePersister() {
    }
 
+   @Override
+   public byte getID() {
+      return ID;
+   }
 
    @Override
    public int getEncodeSize(Message record) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 93b6145..9a03934 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -94,6 +95,10 @@ public class AMQPMessage extends RefCountMessage {
 
    Set<Object> rejectedConsumers;
 
+   /** These are properties set at the broker level..
+    *  these are properties created by the broker only */
+   private volatile TypedProperties extraProperties;
+
    public AMQPMessage(long messageFormat, byte[] data) {
       this.data = Unpooled.wrappedBuffer(data);
       this.messageFormat = messageFormat;
@@ -331,7 +336,7 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
-      return AMQPMessagePersister.getInstance();
+      return AMQPMessagePersisterV2.getInstance();
    }
 
    @Override
@@ -483,7 +488,7 @@ public class AMQPMessage extends RefCountMessage {
       System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
 
       AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
-      newEncode.setDurable(isDurable());
+      newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
       return newEncode;
    }
 
@@ -698,6 +703,50 @@ public class AMQPMessage extends RefCountMessage {
       buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
    }
 
+   public TypedProperties createExtraProperties() {
+      if (extraProperties == null) {
+         extraProperties = new TypedProperties();
+      }
+      return extraProperties;
+   }
+
+   public TypedProperties getExtraProperties() {
+      return extraProperties;
+   }
+
+   public AMQPMessage setExtraProperties(TypedProperties extraProperties) {
+      this.extraProperties = extraProperties;
+      return this;
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
+      createExtraProperties().putBytesProperty(key, value);
+      return this;
+   }
+
+
+   @Override
+   public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      if (extraProperties == null) {
+         return null;
+      } else {
+         return extraProperties.getBytesProperty(key);
+      }
+   }
+
+
+   @Override
+   public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      if (extraProperties == null) {
+         return null;
+      } else {
+         return (byte[])extraProperties.removeProperty(key);
+      }
+   }
+
+
+
    @Override
    public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
       getApplicationPropertiesMap().put(key, Boolean.valueOf(value));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
index 3b5bdda..bec0beb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -25,18 +25,23 @@ import org.apache.activemq.artemis.utils.DataConstants;
 
 public class AMQPMessagePersister extends MessagePersister {
 
-   public static AMQPMessagePersister theInstance = new AMQPMessagePersister();
+   public static final byte ID = 2;
+
+   public static AMQPMessagePersister theInstance;
 
    public static AMQPMessagePersister getInstance() {
+      if (theInstance == null) {
+         theInstance = new AMQPMessagePersister();
+      }
       return theInstance;
    }
 
-   private AMQPMessagePersister() {
+   protected AMQPMessagePersister() {
    }
 
    @Override
-   protected byte getID() {
-      return ProtonProtocolManagerFactory.ID;
+   public byte getID() {
+      return ID;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
new file mode 100644
index 0000000..50e8618
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
+
+public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
+   public static final byte ID = 3;
+
+   public static AMQPMessagePersisterV2 theInstance;
+
+   public static AMQPMessagePersisterV2 getInstance() {
+      if (theInstance == null) {
+         theInstance = new AMQPMessagePersisterV2();
+      }
+      return theInstance;
+   }
+
+   @Override
+   public byte getID() {
+      return ID;
+   }
+
+   public AMQPMessagePersisterV2() {
+      super();
+   }
+
+
+   @Override
+   public int getEncodeSize(Message record) {
+      int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT;
+
+      TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
+
+      return encodeSize + (properties != null ? properties.getEncodeSize() : 0);
+   }
+
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      super.encode(buffer, record);
+
+      TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
+      if (properties == null) {
+         buffer.writeInt(0);
+      } else {
+         buffer.writeInt(properties.getEncodeSize());
+         properties.encode(buffer.byteBuf());
+      }
+   }
+
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message record) {
+      AMQPMessage message = (AMQPMessage)super.decode(buffer, record);
+      int size = buffer.readInt();
+
+      if (size != 0) {
+         TypedProperties properties = new TypedProperties();
+         properties.decode(buffer.byteBuf());
+         message.setExtraProperties(properties);
+      }
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index 3d5f694..84e7af4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -32,8 +32,6 @@ import org.osgi.service.component.annotations.Component;
 @Component(service = ProtocolManagerFactory.class)
 public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<AmqpInterceptor> {
 
-   public static final byte ID = 2;
-
    public static final String AMQP_PROTOCOL_NAME = "AMQP";
 
    private static final String MODULE_NAME = "artemis-amqp-protocol";
@@ -41,13 +39,10 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
    private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
 
    @Override
-   public byte getStoreID() {
-      return ID;
-   }
+   public Persister<Message>[] getPersister() {
 
-   @Override
-   public Persister<Message> getPersister() {
-      return AMQPMessagePersister.getInstance();
+      Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
+      return persisters;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 030a7a0..215c77f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -31,12 +31,14 @@ import java.util.Set;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
 import org.apache.qpid.proton.amqp.Decimal32;
@@ -170,6 +172,13 @@ public class AmqpCoreConverter {
          throw new RuntimeException("Unexpected body type: " + body.getClass());
       }
 
+      TypedProperties properties = message.getExtraProperties();
+      if (properties != null) {
+         for (SimpleString str : properties.getPropertyNames()) {
+            result.getInnerMessage().putBytesProperty(str, properties.getBytesProperty(str));
+         }
+      }
+
       populateMessage(result, message.getProtonMessage());
       result.getInnerMessage().setReplyTo(message.getReplyTo());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
index 8b379a3..9dcf7c9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -25,8 +25,15 @@ import static org.junit.Assert.assertTrue;
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.commons.collections.map.HashedMap;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
@@ -34,6 +41,7 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Assert;
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
@@ -207,6 +215,42 @@ public class AMQPMessageTest {
       assertEquals(0L, decoded.getTimestamp());
    }
 
+   @Test
+   public void testExtraProperty() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      byte[] original = RandomUtil.randomBytes();
+      SimpleString name = SimpleString.toSimpleString("myProperty");
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      decoded.setAddress("someAddress");
+      decoded.setMessageID(33);
+      decoded.putExtraBytesProperty(name, original);
+
+      ICoreMessage coreMessage = decoded.toCore();
+      Assert.assertEquals(original, coreMessage.getBytesProperty(name));
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
+      try {
+         decoded.getPersister().encode(buffer, decoded);
+         Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
+         AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
+         Assert.assertEquals(33, readMessage.getMessageID());
+         Assert.assertEquals("someAddress", readMessage.getAddress());
+         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
+      } finally {
+         buffer.release();
+      }
+
+      {
+         ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
+         AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
+         Assert.assertEquals(33, readMessage.getMessageID());
+         Assert.assertEquals("someAddress", readMessage.getAddress());
+         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
+      }
+
+   }
+
    private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
       ByteBuf nettyBuffer = Unpooled.buffer(1500);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 377223b..c3d2f0f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -238,7 +238,7 @@ public final class BindingsImpl implements Bindings {
       /* This is a special treatment for scaled-down messages involving SnF queues.
        * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
        */
-      byte[] ids = (byte[]) message.removeAnnotation(Message.HDR_SCALEDOWN_TO_IDS);
+      byte[] ids = (byte[]) message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
 
       if (ids != null) {
          ByteBuffer buffer = ByteBuffer.wrap(ids);
@@ -268,7 +268,7 @@ public final class BindingsImpl implements Bindings {
 
       if (!routed) {
          // Remove the ids now, in order to avoid double check
-         ids = (byte[]) message.removeAnnotation(Message.HDR_ROUTE_TO_IDS);
+         ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
 
          // Fetch the groupId now, in order to avoid double checking
          SimpleString groupId = message.getGroupID();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 4627325..a64394a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1227,7 +1227,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                                     AtomicBoolean startedTX) throws Exception {
       // Check the DuplicateCache for the Bridge first
 
-      Object bridgeDup = message.removeAnnotation(Message.HDR_BRIDGE_DUPLICATE_ID);
+      Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
       if (bridgeDup != null) {
          // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
          byte[] bridgeDupBytes = (byte[]) bridgeDup;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 7deb47e..e89dee0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -90,6 +90,7 @@ import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.SimpleFuture;
 import org.apache.activemq.artemis.utils.SimpleFutureImpl;
@@ -686,7 +687,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
          try {
             final SessionSendMessage message = (SessionSendMessage) packet;
             requiresResponse = message.isRequiresResponse();
-            this.session.send(message.getMessage(), this.direct);
+            this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
             if (requiresResponse) {
                response = new NullResponseMessage();
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
index 7560917..c442231 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -32,19 +32,13 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
 
 public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
 
-   public static final byte ID = 1;
    private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
 
    private static final String MODULE_NAME = "artemis-server";
 
    @Override
-   public byte getStoreID() {
-      return ID;
-   }
-
-   @Override
-   public Persister<Message> getPersister() {
-      return CoreMessagePersister.getInstance();
+   public Persister<Message>[] getPersister() {
+      return new Persister[]{CoreMessagePersister.getInstance()};
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index f0f5b3b..277cacf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
+import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.FutureLatch;
@@ -515,7 +516,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
          // We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
          byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
 
-         message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
+         message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes);
       }
 
       if (transformer != null) {
@@ -528,9 +529,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                                " as transformedMessage");
             }
          }
-         return transformedMessage;
+         return EmbedMessageUtil.embedAsCoreMessage(transformedMessage);
       } else {
-         return message;
+         return EmbedMessageUtil.embedAsCoreMessage(message);
       }
    }
 
@@ -575,7 +576,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             dest = forwardingAddress;
          } else {
             // Preserve the original address
-            dest = message.getAddressSimpleString();
+            dest = ref.getMessage().getAddressSimpleString();
          }
 
          pendingAcks.countUp();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index fe39eb5..77066c1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -166,7 +166,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
 
       Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
 
-      byte[] queueIds = message.getBytesProperty(idsHeaderName);
+      byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
 
       if (queueIds == null) {
          // Sanity check only
@@ -180,7 +180,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
          }
       }
 
-      messageCopy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
+      messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
 
       messageCopy = super.beforeForward(messageCopy);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 02a7671..2161f72 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -315,7 +315,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
     * @param message
     */
    private void addRouteContextToMessage(final Message message) {
-      byte[] ids = message.getBytesProperty(idsHeaderName);
+      byte[] ids = message.getExtraBytesProperty(idsHeaderName);
 
       if (ids == null) {
          ids = new byte[8];
@@ -331,7 +331,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
 
       buff.putLong(remoteQueueID);
 
-      message.putBytesProperty(idsHeaderName, ids);
+      message.putExtraBytesProperty(idsHeaderName, ids);
 
       if (logger.isTraceEnabled()) {
          logger.trace("Adding remoteQueue ID = " + remoteQueueID + " into message=" + message + " store-forward-queue=" + storeAndForwardQueue);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
new file mode 100644
index 0000000..8f4e102
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.spi.core.protocol;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.jboss.logging.Logger;
+
+public class EmbedMessageUtil {
+
+   private static final byte[] signature = new byte[]{(byte) 'E', (byte) 'M', (byte) 'B'};
+
+   private static final Logger logger = Logger.getLogger(EmbedMessageUtil.class);
+
+   public static ICoreMessage embedAsCoreMessage(Message source) {
+
+      if (source instanceof ICoreMessage) {
+         return (ICoreMessage) source;
+      } else {
+         Persister persister = source.getPersister();
+
+         CoreMessage message = new CoreMessage(source.getMessageID(), persister.getEncodeSize(source) + signature.length + CoreMessage.BODY_OFFSET).setType(Message.EMBEDDED_TYPE);
+
+         ActiveMQBuffer buffer = message.getBodyBuffer();
+         buffer.writeBytes(signature);
+         persister.encode(buffer, source);
+         return message;
+      }
+   }
+
+   public static Message extractEmbedded(ICoreMessage message) {
+      if (message.getType() == Message.EMBEDDED_TYPE) {
+         ActiveMQBuffer buffer = message.getReadOnlyBodyBuffer();
+
+         if (buffer.readableBytes() < signature.length || !checkSignature(buffer)) {
+            if (!logger.isTraceEnabled()) {
+               logger.trace("Message type " + Message.EMBEDDED_TYPE + " was used for something other than embed messages, ignoring content and treating as a regular message");
+            }
+            return message;
+         }
+
+         try {
+            return MessagePersister.getInstance().decode(buffer, null);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            return message;
+         }
+      } else {
+         return message;
+      }
+   }
+
+   private static boolean checkSignature(final ActiveMQBuffer buffer) {
+      return buffer.readByte() == signature[0] && buffer.readByte() == signature[1] && buffer.readByte() == signature[2];
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
index 0f45874..ad1317f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -17,15 +17,12 @@
 
 package org.apache.activemq.artemis.spi.core.protocol;
 
-import java.util.Map;
 import java.util.ServiceLoader;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
-import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.jboss.logging.Logger;
 
 public class MessagePersister implements Persister<Message> {
@@ -35,33 +32,47 @@ public class MessagePersister implements Persister<Message> {
    private static final MessagePersister theInstance = new MessagePersister();
 
    /** This will be used for reading messages */
-   private static Map<Byte, Persister<Message>> protocols = new ConcurrentHashMap<>();
+   private static final int MAX_PERSISTERS = 3;
+   private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS];
 
    static {
-      MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
+      CoreMessagePersister persister = CoreMessagePersister.getInstance();
+      MessagePersister.registerPersister(persister);
 
       Iterable<ProtocolManagerFactory> protocols  = ServiceLoader.load(ProtocolManagerFactory.class, MessagePersister.class.getClassLoader());
       for (ProtocolManagerFactory next : protocols) {
-         MessagePersister.registerPersister(next.getStoreID(), next.getPersister());
+         registerProtocol(next);
       }
    }
 
    public static void registerProtocol(ProtocolManagerFactory manager) {
-      Persister<Message> messagePersister = manager.getPersister();
-      if (messagePersister == null) {
+      Persister<Message>[] messagePersisters = manager.getPersister();
+      if (messagePersisters == null || messagePersisters.length == 0) {
          logger.debug("Cannot find persister for " + manager);
       } else {
-         registerPersister(manager.getStoreID(), manager.getPersister());
+         for (Persister p : messagePersisters) {
+            registerPersister(p);
+         }
       }
    }
 
    public static void clearPersisters() {
-      protocols.clear();
+      for (int i = 0; i < persisters.length; i++) {
+         persisters[i] = null;
+      }
+   }
+
+   public static Persister getPersister(byte id) {
+      if (id == 0 || id > MAX_PERSISTERS) {
+         return null;
+      }
+      return persisters[id - 1];
    }
 
-   public static void registerPersister(byte recordType, Persister<Message> persister) {
+   public static void registerPersister(Persister<Message> persister) {
       if (persister != null) {
-         protocols.put(recordType, persister);
+         assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
+         persisters[persister.getID() - 1] = persister;
       }
    }
 
@@ -73,10 +84,6 @@ public class MessagePersister implements Persister<Message> {
    protected MessagePersister() {
    }
 
-   protected byte getID() {
-      return (byte)0;
-   }
-
    @Override
    public int getEncodeSize(Message record) {
       return 0;
@@ -92,7 +99,7 @@ public class MessagePersister implements Persister<Message> {
    @Override
    public Message decode(ActiveMQBuffer buffer, Message record) {
       byte protocol = buffer.readByte();
-      Persister<Message> persister = protocols.get(protocol);
+      Persister<Message> persister = getPersister(protocol);
       if (persister == null) {
          throw new NullPointerException("couldn't find factory for type=" + protocol);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index 9574540..4ab34eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -26,16 +26,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 
 public interface ProtocolManagerFactory<P extends BaseInterceptor> {
 
-   /** This is to be used to store the protocol-id on Messages.
-    *  Messages are stored on their bare format.
-    *  The protocol manager will be responsible to code or decode messages.
-    *  The caveat here is that the first short-sized bytes need to be this constant. */
-   default byte getStoreID() {
-      return (byte)0;
-   }
-
-   default Persister<Message> getPersister() {
-      return null;
+   default Persister<Message>[] getPersister() {
+      return new Persister[]{};
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AMQPMessageLoadBalancingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AMQPMessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AMQPMessageLoadBalancingTest.java
new file mode 100644
index 0000000..ba4cdaf
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AMQPMessageLoadBalancingTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQPMessageLoadBalancingTest extends ClusterTestBase {
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      start();
+   }
+
+   private void start() throws Exception {
+      setupServers();
+
+      setRedistributionDelay(0);
+   }
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testLoadBalanceAMQP() throws Exception {
+      setupCluster(MessageLoadBalancingType.STRICT);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
+      createQueue(1, "queues.0", "queues.0", null, true, null, null, RoutingType.ANYCAST);
+
+      final int NUMBER_OF_MESSAGES = 100;
+
+      // sending AMQP Messages.. they should be load balanced
+      {
+         JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+         Connection connection = factory.createConnection();
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         MessageProducer producer = session.createProducer(session.createQueue("queues.0"));
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session.createTextMessage("hello " + i));
+         }
+         session.commit();
+
+         connection.close();
+      }
+
+      receiveOnBothNodes(NUMBER_OF_MESSAGES);
+
+      // If a user used a message type = 7, for messages that are not embedded,
+      // it should still be treated as a normal message
+      {
+         ClientSession sessionProducer = sfs[0].createSession();
+         ClientProducer producer = sessionProducer.createProducer("queues.0");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            // The user is mistakenly using the same type we used for embedded messages. it should still work
+            ClientMessage message = sessionProducer.createMessage(Message.EMBEDDED_TYPE, true).putIntProperty("i", i);
+            message.getBodyBuffer().writeString("hello!");
+            producer.send(message);
+
+            // will send 2 messages.. one with stuff, another empty
+            message = sessionProducer.createMessage(Message.EMBEDDED_TYPE, true);
+            producer.send(message);
+         }
+         receiveOnBothNodes(NUMBER_OF_MESSAGES * 2);
+      }
+
+   }
+
+   private void receiveOnBothNodes(int NUMBER_OF_MESSAGES) throws ActiveMQException {
+      for (int x = 0; x <= 1; x++) {
+         ClientSession sessionX = sfs[x].createSession();
+         ClientConsumer consumerX = sessionX.createConsumer("queues.0");
+         sessionX.start();
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES / 2; i++) {
+            ClientMessage msg = consumerX.receive(5000);
+            Assert.assertNotNull(msg);
+            msg.acknowledge();
+         }
+         Assert.assertNull(consumerX.receiveImmediate());
+         sessionX.commit();
+         sessionX.close();
+      }
+   }
+
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
+   }
+
+   protected void setRedistributionDelay(final long delay) {
+      AddressSettings as = new AddressSettings().setRedistributionDelay(delay);
+
+      getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+      getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+   }
+
+   protected void setupServers() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+      servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+   }
+
+   protected void stopServers() throws Exception {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      closeAllServerLocatorsFactories();
+
+      stopServers(0, 1);
+
+      clearServer(0, 1);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
index 847e8b7..b10b811 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
@@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
-import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
 import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
@@ -52,8 +51,8 @@ public class PageTest extends ActiveMQTestBase {
 
    @Before
    public void registerProtocols() {
-      MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
-      MessagePersister.registerPersister((byte)2, AMQPMessagePersister.getInstance());
+      MessagePersister.registerPersister(CoreMessagePersister.getInstance());
+      MessagePersister.registerPersister(AMQPMessagePersister.getInstance());
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fda75a9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index d261f64..d3a67e7 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -51,14 +51,12 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
-import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
@@ -75,8 +73,8 @@ import org.junit.Test;
 public class PagingStoreImplTest extends ActiveMQTestBase {
 
    static {
-      MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
-      MessagePersister.registerPersister(ProtonProtocolManagerFactory.ID, AMQPMessagePersister.getInstance());
+      MessagePersister.registerPersister(CoreMessagePersister.getInstance());
+      MessagePersister.registerPersister(AMQPMessagePersister.getInstance());
    }
 
    private static final SimpleString destinationTestName = new SimpleString("test");


Mime
View raw message