activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [45/50] [abbrv] activemq-artemis git commit: ARTEMIS-880 Add support for address prefixing
Date Fri, 09 Dec 2016 19:49:29 GMT
ARTEMIS-880 Add support for address prefixing


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8f532cc2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8f532cc2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8f532cc2

Branch: refs/heads/master
Commit: 8f532cc25d4179b4014f2136e2861bd73f6b5d61
Parents: a5031b5
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Fri Dec 9 18:04:54 2016 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java |   6 +-
 .../impl/wireformat/CreateQueueMessage_V2.java  |   2 +-
 .../activemq/artemis/utils/PrefixUtil.java      |  65 +++++++++
 .../amqp/broker/AMQPSessionCallback.java        |   2 +-
 .../amqp/broker/ProtonProtocolManager.java      |  25 ++++
 .../protocol/mqtt/MQTTConnectionManager.java    |   2 +-
 .../core/protocol/mqtt/MQTTProtocolHandler.java |  10 +-
 .../mqtt/MQTTProtocolManagerFactory.java        |   5 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java |  12 +-
 .../protocol/openwire/OpenWireConnection.java   |   4 +-
 .../openwire/OpenWireProtocolManager.java       |  23 ++++
 .../core/protocol/openwire/amq/AMQSession.java  |  12 +-
 .../protocol/stomp/StompProtocolManager.java    |   4 +-
 .../core/postoffice/impl/LocalQueueBinding.java |  12 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   6 +-
 .../core/impl/ActiveMQPacketHandler.java        |   2 +-
 .../protocol/core/impl/CoreProtocolManager.java |  24 ++++
 .../artemis/core/server/ActiveMQServer.java     |   4 +-
 .../artemis/core/server/RoutingContext.java     |   8 ++
 .../artemis/core/server/ServerSession.java      |  11 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  18 +--
 .../artemis/core/server/impl/DivertImpl.java    |   2 +-
 .../core/server/impl/RoutingContextImpl.java    |  25 ++++
 .../core/server/impl/ServerSessionImpl.java     | 134 ++++++++++++-------
 .../core/protocol/AbstractProtocolManager.java  |  25 ++++
 .../spi/core/protocol/ProtocolManager.java      |   9 ++
 .../integration/client/CoreClientTest.java      |  99 ++++++++++++++
 .../integration/client/HangConsumerTest.java    |   6 +-
 .../core/server/impl/fakes/FakePostOffice.java  |  18 +--
 29 files changed, 469 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index b4a02ea..decd189 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -118,6 +118,11 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
 
    @Override
    public CharSequence subSequence(final int start, final int end) {
+      return subSeq(start, end);
+   }
+
+
+   public SimpleString subSeq(final int start, final int end) {
       int len = data.length >> 1;
 
       if (end < start || start < 0 || end > len) {
@@ -399,5 +404,4 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
          dst[d++] = (char) (low | high);
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index e2867ab..d9def3c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -107,7 +107,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       buffer.writeBoolean(autoCreated);
-      buffer.writeByte(routingType.getType());
+      buffer.writeByte(routingType == null ? -1 : routingType.getType());
       buffer.writeInt(maxConsumers);
       buffer.writeBoolean(deleteOnNoConsumers);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
new file mode 100644
index 0000000..cc3bf1c
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+
+public class PrefixUtil {
+
+   public static Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
+                                                                   RoutingType defaultRoutingType,
+                                                                   Map<SimpleString, RoutingType> prefixes) {
+      for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
+         if (address.startsWith(entry.getKey())) {
+            return new Pair<>(removePrefix(address, entry.getKey()), entry.getValue());
+         }
+      }
+      return new Pair<>(address, defaultRoutingType);
+   }
+
+   public static Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
+                                                                          Set<RoutingType> defaultRoutingTypes,
+                                                                          Map<SimpleString, RoutingType> prefixes) {
+      for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
+         if (address.startsWith(entry.getKey())) {
+            Set routingTypes = new HashSet<>();
+            routingTypes.add(entry.getValue());
+            return new Pair<>(removePrefix(address, entry.getKey()), routingTypes);
+         }
+      }
+      return new Pair<>(address, defaultRoutingTypes);
+   }
+
+   public static SimpleString getAddress(SimpleString address, Map<SimpleString, RoutingType> prefixes) {
+      for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
+         if (address.startsWith(entry.getKey())) {
+            return removePrefix(address, entry.getKey());
+         }
+      }
+      return address;
+   }
+
+   private static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
+      return string.subSeq(prefix.length(), string.length());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 9d69b00..c0dc34e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -159,7 +159,7 @@ public class AMQPSessionCallback implements SessionCallback {
                                                         false, // boolean autoCommitAcks,
                                                         false, // boolean preAcknowledge,
                                                         true, //boolean xa,
-                                                        (String) null, this, true, operationContext);
+                                                        (String) null, this, true, operationContext, manager.getPrefixes());
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 9b84dc1..0e8ab37 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -17,16 +17,20 @@
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@@ -54,6 +58,8 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
 
    private final ProtonProtocolManagerFactory factory;
 
+   private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
+
    /*
    * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
    * the address. This can be changed on the acceptor.
@@ -168,4 +174,23 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    public void setMaxFrameSize(int maxFrameSize) {
       this.maxFrameSize = maxFrameSize;
    }
+
+   @Override
+   public void setAnycastPrefix(String anycastPrefix) {
+      for (String prefix : anycastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST);
+      }
+   }
+
+   @Override
+   public void setMulticastPrefix(String multicastPrefix) {
+      for (String prefix : multicastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST);
+      }
+   }
+
+   @Override
+   public Map<SimpleString, RoutingType> getPrefixes() {
+      return prefixes;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index ce65648..a4690e7 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -97,7 +97,7 @@ public class MQTTConnectionManager {
       ActiveMQServer server = session.getServer();
 
       ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null,
-                                                         session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext());
+                                                         session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext(), session.getProtocolManager().getPrefixes());
       return (ServerSessionImpl) serverSession;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 80923e9..0149f46 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.util.Map;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -37,7 +39,9 @@ import io.netty.handler.codec.mqtt.MqttSubAckPayload;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
 import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 
 /**
@@ -53,6 +57,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    private MQTTSession session;
 
    private ActiveMQServer server;
+
    private MQTTProtocolManager protocolManager;
 
    // This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx.
@@ -62,15 +67,18 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
 
    private boolean stopped = false;
 
+   private Map<SimpleString, RoutingType> prefixes;
+
    public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
       this.server = server;
       this.protocolManager = protocolManager;
+      this.prefixes = protocolManager.getPrefixes();
    }
 
    void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
       this.connectionEntry = entry;
       this.connection = connection;
-      this.session = new MQTTSession(this, connection);
+      this.session = new MQTTSession(this, connection, protocolManager);
    }
 
    void stop(boolean error) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
index 553f9ad..453b267 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.utils.uri.BeanSupport;
 import org.osgi.service.component.annotations.Component;
 
 @Component(service = ProtocolManagerFactory.class)
@@ -40,8 +41,8 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<M
    public ProtocolManager createProtocolManager(ActiveMQServer server,
                                                 final Map<String, Object> parameters,
                                                 List<BaseInterceptor> incomingInterceptors,
-                                                List<BaseInterceptor> outgoingInterceptors) {
-      return new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors);
+                                                List<BaseInterceptor> outgoingInterceptors) throws Exception {
+      return BeanSupport.setData(new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors), parameters);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 059948f..d4fd7bf 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -53,8 +53,14 @@ public class MQTTSession {
 
    private MQTTLogger log = MQTTLogger.LOGGER;
 
-   public MQTTSession(MQTTProtocolHandler protocolHandler, MQTTConnection connection) throws Exception {
+   private MQTTProtocolManager protocolManager;
+
+   public MQTTSession(MQTTProtocolHandler protocolHandler,
+                      MQTTConnection connection,
+                      MQTTProtocolManager protocolManager) throws Exception {
       this.protocolHandler = protocolHandler;
+      this.protocolManager = protocolManager;
+
       this.connection = connection;
 
       mqttConnectionManager = new MQTTConnectionManager(this);
@@ -149,4 +155,8 @@ public class MQTTSession {
    MQTTConnection getConnection() {
       return connection;
    }
+
+   MQTTProtocolManager getProtocolManager() {
+      return protocolManager;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index d6add20..9e65f17 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -660,7 +660,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    }
 
    private void createInternalSession(ConnectionInfo info) throws Exception {
-      internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext);
+      internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes());
    }
 
    //raise the refCount of context
@@ -847,7 +847,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    }
 
    public AMQSession addSession(SessionInfo ss, boolean internal) {
-      AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager.getScheduledPool());
+      AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager);
       amqSession.initialize();
 
       if (internal) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 0ee1711..cf673f8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -34,6 +34,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
@@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
@@ -118,6 +120,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
    private final OpenWireMessageConverter messageConverter;
 
+   private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -558,4 +562,23 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
       this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
    }
+
+   @Override
+   public void setAnycastPrefix(String anycastPrefix) {
+      for (String prefix : anycastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST);
+      }
+   }
+
+   @Override
+   public void setMulticastPrefix(String multicastPrefix) {
+      for (String prefix : multicastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST);
+      }
+   }
+
+   @Override
+   public Map<SimpleString, RoutingType> getPrefixes() {
+      return prefixes;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index c64374a..a0826a7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
@@ -75,17 +76,20 @@ public class AMQSession implements SessionCallback {
    // so we make a new one per AMQSession
    private final OpenWireMessageConverter converter;
 
+   private final OpenWireProtocolManager protocolManager;
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
                      OpenWireConnection connection,
-                     ScheduledExecutorService scheduledPool) {
+                     OpenWireProtocolManager protocolManager) {
       this.connInfo = connInfo;
       this.sessInfo = sessInfo;
 
       this.server = server;
       this.connection = connection;
-      this.scheduledPool = scheduledPool;
+      this.protocolManager = protocolManager;
+      this.scheduledPool = protocolManager.getScheduledPool();
       OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
 
       this.converter = new OpenWireMessageConverter(marshaller.copy());
@@ -109,7 +113,7 @@ public class AMQSession implements SessionCallback {
       // now
 
       try {
-         coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext());
+         coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes());
 
          long sessionId = sessInfo.getSessionId().getValue();
          if (sessionId == -1) {
@@ -169,7 +173,7 @@ public class AMQSession implements SessionCallback {
          BindingQueryResult bindingQuery = server.bindingQuery(queueName);
          QueueQueryResult queueBinding = server.queueQuery(queueName);
 
-         boolean isAutoCreate = bindingQuery.isExists() ?  true : bindingQuery.isAutoCreateJmsQueues();
+         boolean isAutoCreate = bindingQuery.isExists() ? true : bindingQuery.isAutoCreateJmsQueues();
 
          if (!queueBinding.isExists()) {
             if (isAutoCreate) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 0c1f7dd..aba3634 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
          String name = UUIDGenerator.getInstance().generateStringUUID();
-         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext());
+         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
          stompSession.setServerSession(session);
          sessions.put(connection.getID(), stompSession);
       }
@@ -243,7 +243,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
          String name = UUIDGenerator.getInstance().generateStringUUID();
-         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext());
+         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes());
          stompSession.setServerSession(session);
          transactedSessions.put(txID, stompSession);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index e09d108..18c87cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.postoffice.impl;
 
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
@@ -118,23 +117,20 @@ public class LocalQueueBinding implements QueueBinding {
 
    @Override
    public void route(final ServerMessage message, final RoutingContext context) throws Exception {
-      if (isMatchRoutingType(message)) {
+      if (isMatchRoutingType(context)) {
          queue.route(message, context);
       }
    }
 
    @Override
    public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
-      if (isMatchRoutingType(message)) {
+      if (isMatchRoutingType(context)) {
          queue.routeWithAck(message, context);
       }
    }
 
-   private boolean isMatchRoutingType(ServerMessage message) {
-      if (message.containsProperty(Message.HDR_ROUTING_TYPE)) {
-         return message.getByteProperty(Message.HDR_ROUTING_TYPE).equals(queue.getRoutingType().getType());
-      }
-      return true;
+   private boolean isMatchRoutingType(RoutingContext context) {
+      return (context.getRoutingType() == null || context.getRoutingType() == queue.getRoutingType());
    }
 
    public boolean isQueueBinding() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 34e966c..16c3021 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -664,12 +664,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          throw new IllegalStateException("Message cannot be routed more than once");
       }
 
-      SimpleString address = message.getAddress();
-
       setPagingStore(message);
 
       AtomicBoolean startedTX = new AtomicBoolean(false);
 
+      final SimpleString address = message.getAddress();
+
       applyExpiryDelay(message, address);
 
       if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
@@ -682,7 +682,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          cleanupInternalPropertiesBeforeRouting(message);
       }
 
-      Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+      Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddress() : context.getAddress());
 
       // TODO auto-create queues here?
       // first check for the auto-queue creation thing

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 39a6ac7..1034919 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -153,7 +153,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
 
          OperationContext sessionOperationContext = server.newOperationContext();
 
-         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext);
+         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, protocolManager.getPrefixes());
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
          channel.setHandler(handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index 3fb642a..62b10c4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
@@ -51,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -74,6 +77,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
 
    private final CoreProtocolManagerFactory protocolManagerFactory;
 
+   private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
+
    public CoreProtocolManager(final CoreProtocolManagerFactory factory,
                               final ActiveMQServer server,
                               final List<Interceptor> incomingInterceptors,
@@ -189,6 +194,25 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
       return websocketRegistryNames;
    }
 
+   @Override
+   public void setAnycastPrefix(String anycastPrefix) {
+      for (String prefix : anycastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST);
+      }
+   }
+
+   @Override
+   public void setMulticastPrefix(String multicastPrefix) {
+      for (String prefix : multicastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST);
+      }
+   }
+
+   @Override
+   public Map<SimpleString, RoutingType> getPrefixes() {
+      return prefixes;
+   }
+
    private boolean isArtemis(ActiveMQBuffer buffer) {
       return buffer.getByte(0) == 'A' &&
          buffer.getByte(1) == 'R' &&

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 5fe71bb..cc5e51e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server;
 
 import javax.management.MBeanServer;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -197,7 +198,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
                                String defaultAddress,
                                SessionCallback callback,
                                boolean autoCreateQueues,
-                               OperationContext context) throws Exception;
+                               OperationContext context,
+                               Map<SimpleString, RoutingType> prefixes) throws Exception;
 
    SecurityStore getSecurityStore();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 32baee1..9f2cf4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -45,4 +45,12 @@ public interface RoutingContext {
    void addQueueWithAck(SimpleString address, Queue queue);
 
    boolean isAlreadyAcked(SimpleString address, Queue queue);
+
+   void setAddress(SimpleString address);
+
+   void setRoutingType(RoutingType routingType);
+
+   SimpleString getAddress();
+
+   RoutingType getRoutingType();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index e6b5ad4..9559d74 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -109,7 +109,8 @@ public interface ServerSession extends SecurityAuth {
                      boolean temporary,
                      boolean durable) throws Exception;
 
-   /** Create queue with default delivery mode
+   /**
+    * Create queue with default delivery mode
     *
     * @param address
     * @param name
@@ -143,9 +144,13 @@ public interface ServerSession extends SecurityAuth {
                      boolean durable,
                      boolean autoCreated) throws Exception;
 
-   AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception;
+   AddressInfo createAddress(final SimpleString address,
+                             Set<RoutingType> routingTypes,
+                             final boolean autoCreated) throws Exception;
 
-   AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception;
+   AddressInfo createAddress(final SimpleString address,
+                             RoutingType routingType,
+                             final boolean autoCreated) throws Exception;
 
    void deleteQueue(SimpleString name) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 742de33..dcb4d45 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1256,7 +1256,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                       final String defaultAddress,
                                       final SessionCallback callback,
                                       final boolean autoCreateQueues,
-                                      final OperationContext context) throws Exception {
+                                      final OperationContext context,
+                                      final Map<SimpleString, RoutingType> prefixes) throws Exception {
       String validatedUser = "";
 
       if (securityStore != null) {
@@ -1269,7 +1270,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       checkSessionLimit(validatedUser);
 
-      final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues);
+      final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
 
       sessions.put(name, session);
 
@@ -1341,8 +1342,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                                      String defaultAddress,
                                                      SessionCallback callback,
                                                      OperationContext context,
-                                                     boolean autoCreateJMSQueues) throws Exception {
-      return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager);
+                                                     boolean autoCreateJMSQueues,
+                                                     Map<SimpleString, RoutingType> prefixes) throws Exception {
+      return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager, prefixes);
    }
 
    @Override
@@ -2495,12 +2497,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       boolean addressAlreadyExists = true;
 
-      if (info == null) {
-         if (autoCreateAddress) {
-            createAddressInfo(defaultAddressInfo.setAutoCreated(true));
+      if (autoCreateAddress) {
+         if (info == null || !info.getRoutingTypes().contains(routingType)) {
+            createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true));
             addressAlreadyExists = false;
-         } else {
-            throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index fd55521..6fe885b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -93,7 +93,7 @@ public class DivertImpl implements Divert {
       ServerMessage copy = null;
 
       // Shouldn't copy if it's not routed anywhere else
-      if (!forwardAddress.equals(message.getAddress())) {
+      if (!forwardAddress.equals(context.getAddress())) {
          long id = storageManager.generateID();
          copy = message.copy(id);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 6e0aa95..0cbb0e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
 public final class RoutingContextImpl implements RoutingContext {
@@ -36,6 +37,10 @@ public final class RoutingContextImpl implements RoutingContext {
 
    private int queueCount;
 
+   private SimpleString address;
+
+   private RoutingType routingType;
+
    public RoutingContextImpl(final Transaction transaction) {
       this.transaction = transaction;
    }
@@ -77,6 +82,26 @@ public final class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
+   public void setAddress(SimpleString address) {
+      this.address = address;
+   }
+
+   @Override
+   public void setRoutingType(RoutingType routingType) {
+      this.routingType = routingType;
+   }
+
+   @Override
+   public SimpleString getAddress() {
+      return address;
+   }
+
+   @Override
+   public RoutingType getRoutingType() {
+      return routingType;
+   }
+
+   @Override
    public RouteContextList getContextListing(SimpleString address) {
       RouteContextList listing = map.get(address);
       if (listing == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 347874b..dec6f65 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -67,12 +67,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -88,6 +88,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.utils.JsonLoader;
+import org.apache.activemq.artemis.utils.PrefixUtil;
 import org.apache.activemq.artemis.utils.TypedProperties;
 import org.apache.activemq.artemis.utils.UUID;
 import org.jboss.logging.Logger;
@@ -182,6 +183,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    // concurrently.
    private volatile boolean closed = false;
 
+   private boolean prefixEnabled = false;
+
+   private Map<SimpleString, RoutingType> prefixes;
+
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -203,7 +208,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final SimpleString defaultAddress,
                             final SessionCallback callback,
                             final OperationContext context,
-                            final PagingManager pagingManager) throws Exception {
+                            final PagingManager pagingManager,
+                            final Map<SimpleString, RoutingType> prefixes) throws Exception {
       this.username = username;
 
       this.password = password;
@@ -241,6 +247,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       this.server = server;
 
+      this.prefixes = prefixes;
+      if (this.prefixes != null && !this.prefixes.isEmpty()) {
+         prefixEnabled = true;
+      }
+
       this.managementAddress = managementAddress;
 
       this.callback = callback;
@@ -255,8 +266,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
    }
 
-   // ServerSession implementation ----------------------------------------------------------------------------
-
+   // ServerSession implementation ---------------------------------------------------------------------------
    @Override
    public void enableSecurity() {
       this.securityEnabled = true;
@@ -387,7 +397,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
    }
 
-   protected void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception {
+   private void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception {
       if (securityEnabled) {
          securityStore.check(address, checkType, auth);
       }
@@ -414,17 +424,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
       }
 
+      SimpleString address = removePrefix(binding.getAddress());
       if (browseOnly) {
          try {
-            securityCheck(binding.getAddress(), CheckType.BROWSE, this);
+            securityCheck(address, CheckType.BROWSE, this);
          } catch (Exception e) {
-            securityCheck(binding.getAddress().concat(".").concat(queueName), CheckType.BROWSE, this);
+            securityCheck(address.concat(".").concat(queueName), CheckType.BROWSE, this);
          }
       } else {
          try {
-            securityCheck(binding.getAddress(), CheckType.CONSUME, this);
+            securityCheck(address, CheckType.CONSUME, this);
          } catch (Exception e) {
-            securityCheck(binding.getAddress().concat(".").concat(queueName), CheckType.CONSUME, this);
+            securityCheck(address.concat(".").concat(queueName), CheckType.CONSUME, this);
          }
       }
 
@@ -436,7 +447,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       if (!browseOnly) {
          TypedProperties props = new TypedProperties();
 
-         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
 
          props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
 
@@ -492,15 +503,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final SimpleString filterString,
                             final boolean temporary,
                             final boolean durable) throws Exception {
-      return createQueue(address,
-                         name,
-                         ActiveMQDefaultConfiguration.getDefaultRoutingType(),
-                         filterString,
-                         temporary,
-                         durable,
-                         ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
-                         ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
-                         false);
+      return createQueue(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType(), filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
    }
 
    @Override
@@ -510,14 +513,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final SimpleString filterString,
                             final boolean temporary,
                             final boolean durable) throws Exception {
-      return createQueue(address,
-                         name, routingType,
-                         filterString,
-                         temporary,
-                         durable,
-                         ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
-                         ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
-                         false);
+      return createQueue(address, name, routingType, filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
    }
 
    @Override
@@ -530,6 +526,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final int maxConsumers,
                             final boolean deleteOnNoConsumers,
                             final boolean autoCreated) throws Exception {
+
+      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
+
       if (durable) {
          // make sure the user has privileges to create this queue
          securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this);
@@ -539,7 +538,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       server.checkQueueCreationLimit(getUsername());
 
-      Queue queue = server.createQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true);
+      Queue queue = server.createQueue(art.getA(), art.getB(), name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true);
 
       if (temporary) {
          // Temporary queue in core simply means the queue will be deleted if
@@ -577,34 +576,36 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             boolean temporary,
                             boolean durable,
                             boolean autoCreated) throws Exception {
-      return createQueue(address,
-                         name, routingType,
-                         filterString,
-                         temporary,
-                         durable,
-                         ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
-                         ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
-                         autoCreated);
+      return createQueue(address, name, routingType, filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated);
    }
 
    @Override
-   public AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception {
-      securityCheck(address, CheckType.CREATE_ADDRESS, this);
-      return server.createOrUpdateAddressInfo(new AddressInfo(address, routingTypes).setAutoCreated(autoCreated));
+   public AddressInfo createAddress(final SimpleString address,
+                                    Set<RoutingType> routingTypes,
+                                    final boolean autoCreated) throws Exception {
+      Pair<SimpleString, Set<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes);
+      securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
+      return server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
    }
 
    @Override
-   public AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception {
-      securityCheck(address, CheckType.CREATE_ADDRESS, this);
-      return server.createOrUpdateAddressInfo(new AddressInfo(address, routingType).setAutoCreated(autoCreated));
+   public AddressInfo createAddress(final SimpleString address,
+                                    RoutingType routingType,
+                                    final boolean autoCreated) throws Exception {
+      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
+      securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
+      return server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
    }
 
    @Override
-   public void createSharedQueue(final SimpleString address,
+   public void createSharedQueue(SimpleString address,
                                  final SimpleString name,
                                  final RoutingType routingType,
                                  boolean durable,
                                  final SimpleString filterString) throws Exception {
+
+      address = removePrefix(address);
+
       securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
 
       server.checkQueueCreationLimit(getUsername());
@@ -715,7 +716,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
-      return server.bindingQuery(address);
+      return server.bindingQuery(removePrefix(address));
    }
 
    @Override
@@ -1317,7 +1318,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
       }
 
-      SimpleString address = message.getAddress();
+      SimpleString address = removePrefix(message.getAddress());
 
       if (defaultAddress == null && address != null) {
          defaultAddress = address;
@@ -1338,12 +1339,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
       }
 
-      if (message.getAddress() == null) {
+      if (address == null) {
          // This could happen with some tests that are ignoring messages
          throw ActiveMQMessageBundle.BUNDLE.noAddress();
       }
 
-      if (message.getAddress().equals(managementAddress)) {
+      if (address.equals(managementAddress)) {
          // It's a management message
 
          handleManagementMessage(tx, message, direct);
@@ -1381,8 +1382,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
-   public void requestProducerCredits(final SimpleString address, final int credits) throws Exception {
-      PagingStore store = server.getPagingManager().getPageStore(address);
+   public void requestProducerCredits(SimpleString address, final int credits) throws Exception {
+      final SimpleString addr = removePrefix(address);
+      PagingStore store = server.getPagingManager().getPageStore(addr);
 
       if (!store.checkMemory(new Runnable() {
          @Override
@@ -1572,7 +1574,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                                  final ServerMessage message,
                                                  final boolean direct) throws Exception {
       try {
-         securityCheck(message.getAddress(), CheckType.MANAGE, this);
+         securityCheck(removePrefix(message.getAddress()), CheckType.MANAGE, this);
       } catch (ActiveMQException e) {
          if (!autoCommitSends) {
             tx.markAsRollbackOnly(e);
@@ -1655,9 +1657,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                final boolean direct,
                                final boolean noAutoCreateQueue) throws Exception {
       RoutingStatus result = RoutingStatus.OK;
+
+      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddress(), null);
+
+      // Consumer
       // check the user has write access to this address.
       try {
-         securityCheck(msg.getAddress(), CheckType.SEND, this);
+         securityCheck(art.getA(), CheckType.SEND, this);
       } catch (ActiveMQException e) {
          if (!autoCommitSends && tx != null) {
             tx.markAsRollbackOnly(e);
@@ -1671,6 +1677,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
 
       try {
+         routingContext.setAddress(art.getA());
+         routingContext.setRoutingType(art.getB());
+
          result = postOffice.route(msg, routingContext, direct);
 
          Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
@@ -1701,4 +1710,27 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          return Collections.emptyList();
       }
    }
+
+   private SimpleString removePrefix(SimpleString address) {
+      if (prefixEnabled) {
+         return PrefixUtil.getAddress(address, prefixes);
+      }
+      return address;
+   }
+
+   private Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
+                                                                    RoutingType defaultRoutingType) {
+      if (prefixEnabled) {
+         return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes);
+      }
+      return new Pair<>(address, defaultRoutingType);
+   }
+
+   private Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
+                                                                          Set<RoutingType> defaultRoutingTypes) {
+      if (prefixEnabled) {
+         return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes);
+      }
+      return new Pair<>(address, defaultRoutingTypes);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
index 6ce2518..baf021e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
@@ -19,13 +19,19 @@
 
 package org.apache.activemq.artemis.spi.core.protocol;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.RoutingType;
 
 public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C extends RemotingConnection> implements ProtocolManager<I> {
 
+   private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
+
    protected void invokeInterceptors(final List<I> interceptors, final P message, final C connection) {
       if (interceptors != null && !interceptors.isEmpty()) {
          for (I interceptor : interceptors) {
@@ -39,4 +45,23 @@ public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C
          }
       }
    }
+
+   @Override
+   public void setAnycastPrefix(String anycastPrefix) {
+      for (String prefix : anycastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST);
+      }
+   }
+
+   @Override
+   public void setMulticastPrefix(String multicastPrefix) {
+      for (String prefix : multicastPrefix.split(",")) {
+         prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST);
+      }
+   }
+
+   @Override
+   public Map<SimpleString, RoutingType> getPrefixes() {
+      return prefixes;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index a7e50d5..b2a0265 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -17,11 +17,14 @@
 package org.apache.activemq.artemis.spi.core.protocol;
 
 import java.util.List;
+import java.util.Map;
 
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
@@ -71,4 +74,10 @@ public interface ProtocolManager<P extends BaseInterceptor> {
     * @return A list of subprotocol ids
     */
    List<String> websocketSubprotocolIdentifiers();
+
+   void setAnycastPrefix(String anycastPrefix);
+
+   void setMulticastPrefix(String multicastPrefix);
+
+   Map<SimpleString, RoutingType> getPrefixes();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
index 3a6c404..1f5ef15 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.hazelcast.util.UuidUtil;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -30,8 +35,11 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -143,4 +151,95 @@ public class CoreClientTest extends ActiveMQTestBase {
 
       sf.close();
    }
+
+   @Test
+   public void testCoreClientPrefixes() throws Exception {
+
+      Configuration configuration = createBasicConfig();
+      configuration.clearAcceptorConfigurations();
+      configuration.addAddressesSetting("#", new AddressSettings().setMaxSizeBytes(10 * 1024 * 1024).setPageSizeBytes(1024 * 1024));
+
+      String baseAddress = "foo";
+
+      List<String> anycastPrefixes = new ArrayList<>();
+      anycastPrefixes.add("anycast://");
+      anycastPrefixes.add("queue://");
+      anycastPrefixes.add("jms.queue.");
+
+      List<String> multicastPrefixes = new ArrayList<>();
+      multicastPrefixes.add("multicast://");
+      multicastPrefixes.add("topic://");
+      multicastPrefixes.add("jms.topic.");
+
+      String locatorString = "tcp://localhost:5445";
+      StringBuilder acceptor = new StringBuilder(locatorString + "?PROTOCOLS=CORE;anycastPrefix=");
+      for (String prefix : anycastPrefixes) {
+         acceptor.append(prefix + ",");
+      }
+      acceptor.append(";multicastPrefix=");
+      for (String prefix : multicastPrefixes) {
+         acceptor.append(prefix + ",");
+      }
+
+      configuration.addAcceptorConfiguration("prefix", acceptor.toString());
+
+      ActiveMQServer server = createServer(configuration);
+      server.start();
+
+      ServerLocator locator = ServerLocatorImpl.newLocator(locatorString);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      Map<String, ClientConsumer> consumerMap = new HashMap<>();
+
+      for (String prefix : anycastPrefixes) {
+         String queueName = UuidUtil.buildRandomUuidString();
+         String address = prefix + baseAddress;
+
+         session.createQueue(prefix + baseAddress, null, queueName, null, false);
+         consumerMap.put(address, session.createConsumer(queueName));
+      }
+
+      for (String prefix : multicastPrefixes) {
+         String queueName = UuidUtil.buildRandomUuidString();
+         String address = prefix + baseAddress;
+
+         session.createQueue(prefix + baseAddress, null, queueName, null, false);
+         consumerMap.put(address, session.createConsumer(queueName));
+      }
+
+      session.start();
+
+      final int numMessages = 3;
+
+      for (String prefix : anycastPrefixes) {
+         ClientProducer producer = session.createProducer(prefix + baseAddress);
+         for (int i = 0; i < numMessages; i++) {
+            ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte) 1);
+            message.getBodyBuffer().writeString("testINVMCoreClient");
+            producer.send(message);
+         }
+
+         // Ensure that messages are load balanced across all queues
+
+         for (String queuePrefix : anycastPrefixes) {
+            ClientConsumer consumer = consumerMap.get(queuePrefix + baseAddress);
+            for (int i = 0; i < numMessages / anycastPrefixes.size(); i++) {
+               ClientMessage message = consumer.receive(1000);
+               assertNotNull(message);
+               message.acknowledge();
+            }
+            assertNull(consumer.receive(1000));
+         }
+
+         for (String multicastPrefix : multicastPrefixes) {
+            ClientConsumer consumer = consumerMap.get(multicastPrefix + baseAddress);
+            assertNull(consumer.receive(100));
+         }
+      }
+
+      sf.close();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 7932dc8..00f296e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.client;
 import javax.management.MBeanServer;
 import java.lang.management.ManagementFactory;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -593,8 +594,9 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                                         String defaultAddress,
                                                         SessionCallback callback,
                                                         OperationContext context,
-                                                        boolean autoCreateQueue) throws Exception {
-         return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager());
+                                                        boolean autoCreateQueue,
+                                                        Map<SimpleString, RoutingType> prefixes) throws Exception {
+         return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager(), prefixes);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index d272c02..918ff41 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -179,14 +179,6 @@ public class FakePostOffice implements PostOffice {
 
    @Override
    public RoutingStatus route(ServerMessage message,
-                              RoutingContext context,
-                              boolean direct) throws Exception {
-      return RoutingStatus.OK;
-
-   }
-
-   @Override
-   public RoutingStatus route(ServerMessage message,
                               Transaction tx,
                               boolean direct) throws Exception {
       return RoutingStatus.OK;
@@ -194,19 +186,23 @@ public class FakePostOffice implements PostOffice {
 
    @Override
    public RoutingStatus route(ServerMessage message,
-                              RoutingContext context,
+                              Transaction tx,
                               boolean direct,
                               boolean rejectDuplicates) throws Exception {
       return RoutingStatus.OK;
+   }
 
+   @Override
+   public RoutingStatus route(ServerMessage message, RoutingContext context, boolean direct) throws Exception {
+      return null;
    }
 
    @Override
    public RoutingStatus route(ServerMessage message,
-                              Transaction tx,
+                              RoutingContext context,
                               boolean direct,
                               boolean rejectDuplicates) throws Exception {
-      return RoutingStatus.OK;
+      return null;
    }
 
    @Override


Mime
View raw message