activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [47/50] [abbrv] activemq-artemis git commit: ARTEMIS-877 Add Consumer support for AMQP for new addressing schema
Date Fri, 09 Dec 2016 19:49:31 GMT
ARTEMIS-877 Add Consumer support for AMQP for new addressing schema


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/224f62b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/224f62b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/224f62b2

Branch: refs/heads/master
Commit: 224f62b295db26eaa0bca0f7798178f025d737c0
Parents: a182a13
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Sat Dec 3 09:03:43 2016 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000

----------------------------------------------------------------------
 .../activemq/cli/test/FileBrokerTest.java       |  24 --
 .../core/management/ActiveMQServerControl.java  |  59 ++++
 .../artemis/core/server/AddressQueryResult.java |  66 ++++
 .../amqp/broker/AMQPSessionCallback.java        |  64 +++-
 .../protocol/amqp/proton/AmqpSupport.java       |   1 +
 .../proton/ProtonServerReceiverContext.java     |  14 +-
 .../amqp/proton/ProtonServerSenderContext.java  | 217 +++++++++---
 .../amqp/proton/handler/ExtCapability.java      |   2 +-
 .../artemis/rest/test/FindDestinationTest.java  |   3 +
 .../impl/ActiveMQServerControlImpl.java         | 120 ++++---
 .../artemis/core/postoffice/AddressManager.java |   6 +
 .../artemis/core/postoffice/PostOffice.java     |   7 +-
 .../core/postoffice/impl/CompositeAddress.java  |  50 +++
 .../core/postoffice/impl/PostOfficeImpl.java    |  11 +
 .../postoffice/impl/SimpleAddressManager.java   |  31 ++
 .../artemis/core/server/ActiveMQServer.java     |   2 +
 .../artemis/core/server/ServerSession.java      |   8 +
 .../cluster/impl/ClusterConnectionImpl.java     |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  21 +-
 .../core/server/impl/ServerSessionImpl.java     |  21 ++
 .../transport/amqp/client/AmqpClient.java       |  38 +++
 .../transport/amqp/client/AmqpConnection.java   |   9 +-
 .../transport/amqp/client/AmqpSession.java      |  63 +++-
 .../integration/amqp/AmqpClientTestSupport.java |  32 ++
 .../amqp/AmqpDurableReceiverTest.java           |   7 +-
 .../amqp/AmqpTempDestinationTest.java           |   2 -
 .../integration/amqp/AmqpTransactionTest.java   |   7 -
 .../amqp/BrokerDefinedAnycastConsumerTest.java  | 240 ++++++++++++++
 .../BrokerDefinedMulticastConsumerTest.java     | 119 +++++++
 .../amqp/ClientDefinedAnycastConsumerTest.java  |  52 +++
 .../amqp/ClientDefinedMultiConsumerTest.java    | 327 +++++++++++++++++++
 .../integration/amqp/ProtonPubSubTest.java      |   4 +
 .../tests/integration/amqp/ProtonTest.java      |  58 +++-
 .../amqp/SendingAndReceivingTest.java           |   4 +
 .../management/ActiveMQServerControlTest.java   |  87 ++---
 .../ActiveMQServerControlUsingCoreTest.java     |  15 +
 .../management/ManagementControlHelper.java     |   8 +
 .../integration/openwire/BasicOpenWireTest.java |   1 -
 .../core/server/impl/fakes/FakePostOffice.java  |  12 +
 39 files changed, 1611 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
index a50a49f..b04b540 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
@@ -45,30 +45,6 @@ import static org.junit.Assert.fail;
 public class FileBrokerTest {
 
    @Test
-   public void startWithJMS() throws Exception {
-      ServerDTO serverDTO = new ServerDTO();
-      serverDTO.configuration = "broker.xml";
-      FileBroker broker = null;
-      try {
-         broker = new FileBroker(serverDTO, new ActiveMQJAASSecurityManager());
-         broker.start();
-         JMSServerManagerImpl jmsServerManager = (JMSServerManagerImpl) broker.getComponents().get("jms");
-         Assert.assertNotNull(jmsServerManager);
-         Assert.assertTrue(jmsServerManager.isStarted());
-         //this tells us the jms server is activated
-         Assert.assertTrue(jmsServerManager.getJMSStorageManager().isStarted());
-         ActiveMQServerImpl activeMQServer = (ActiveMQServerImpl) broker.getComponents().get("core");
-         Assert.assertNotNull(activeMQServer);
-         Assert.assertTrue(activeMQServer.isStarted());
-         Assert.assertTrue(broker.isStarted());
-      } finally {
-         if (broker != null) {
-            broker.stop();
-         }
-      }
-   }
-
-   @Test
    public void startWithoutJMS() throws Exception {
       ServerDTO serverDTO = new ServerDTO();
       serverDTO.configuration = "broker-nojms.xml";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 1797c9a..abd8e9e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -451,10 +451,29 @@ public interface ActiveMQServerControl {
     * @param address address to bind the queue to
     * @param name    name of the queue
     */
+   @Deprecated
    @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
    void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
                     @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
 
+
+   /**
+    * Create a durable queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+
    /**
     * Create a queue.
     * <br>
@@ -466,6 +485,7 @@ public interface ActiveMQServerControl {
     * @param name    name of the queue
     * @param durable whether the queue is durable
     */
+   @Deprecated
    @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
    void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
                     @Parameter(name = "name", desc = "Name of the queue") String name,
@@ -480,6 +500,24 @@ public interface ActiveMQServerControl {
     *
     * @param address address to bind the queue to
     * @param name    name of the queue
+    * @param durable whether the queue is durable
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+   /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
     * @param filter  of the queue
     * @param durable whether the queue is durable
     */
@@ -496,6 +534,27 @@ public interface ActiveMQServerControl {
     * <br>
     * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
     *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
+    * @param filter  of the queue
+    * @param durable whether the queue is durable
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "filter", desc = "Filter of the queue") String filter,
+                    @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+
+   /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
     * @param address             address to bind the queue to
     * @param routingType         the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
     * @param name                name of the queue

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
new file mode 100644
index 0000000..07d7406
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+import java.util.Set;
+
+
+public class AddressQueryResult {
+   private final SimpleString name;
+   private final Set<RoutingType> routingTypes;
+   private final long id;
+   private final boolean autoCreated;
+   private final boolean exists;
+   private final boolean autoCreateAddresses;
+
+   public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses) {
+
+      this.name = name;
+      this.routingTypes = routingTypes;
+      this.id = id;
+
+      this.autoCreated = autoCreated;
+      this.exists = exists;
+      this.autoCreateAddresses = autoCreateAddresses;
+   }
+
+   public SimpleString getName() {
+      return name;
+   }
+
+   public Set<RoutingType> getRoutingTypes() {
+      return routingTypes;
+   }
+
+   public long getId() {
+      return id;
+   }
+
+   public boolean isAutoCreated() {
+      return autoCreated;
+   }
+
+   public boolean isExists() {
+      return exists;
+   }
+
+   public boolean isAutoCreateAddresses() {
+      return autoCreateAddresses;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6d4abc4..9d69b00 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -28,12 +28,15 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
@@ -192,28 +195,40 @@ public class AMQPSessionCallback implements SessionCallback {
       serverConsumer.receiveCredits(-1);
    }
 
-   public void createTemporaryQueue(String queueName) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
+   public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
    }
 
-   public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
+   public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName),  routingType, SimpleString.toSimpleString(filter), true, false);
    }
 
-   public void createDurableQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
+   public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
    }
 
-   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
+   public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
+   }
+
+   public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
+   }
+
+   public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
       QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
 
       if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
          try {
-            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
+            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateQueues(), true, queueQueryResult.isAutoCreated(), queueQueryResult.isDeleteOnNoConsumers(), queueQueryResult.getRoutingType(), queueQueryResult.getMaxConsumers());
+         queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+      }
+
+      if (queueQueryResult.getRoutingType() != routingType) {
+         throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
       }
       return queueQueryResult;
    }
@@ -231,6 +246,20 @@ public class AMQPSessionCallback implements SessionCallback {
       return bindingQueryResult.isExists();
    }
 
+   public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
+      AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+
+      if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
+         try {
+            serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true);
+         } catch (ActiveMQQueueExistsException e) {
+            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+         }
+         addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+      }
+      return addressQueryResult;
+   }
+
    public void closeSender(final Object brokerConsumer) throws Exception {
 
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
@@ -522,4 +551,21 @@ public class AMQPSessionCallback implements SessionCallback {
       protonSPI.removeTransaction(txid);
 
    }
+
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return serverSession.getMatchingQueue(address, routingType);
+   }
+
+
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return serverSession.getMatchingQueue(address, queueName, routingType);
+   }
+
+   public AddressInfo getAddress(SimpleString address) {
+      return  serverSession.getAddress(address);
+   }
+
+   public void removeTemporaryQueue(String address) throws Exception {
+      serverSession.deleteQueue(SimpleString.toSimpleString(address));
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 7bdbd2e..ff398dc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -55,6 +55,7 @@ public class AmqpSupport {
    public static final Symbol PLATFORM = Symbol.valueOf("platform");
    public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
    public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
+   public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
 
 
    // Symbols used in configuration of newly opened links.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 0cc293a..a265836 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -27,6 +28,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
@@ -55,6 +57,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    // Used by the broker to decide when to refresh clients credit.  This is not used when client requests credit.
    private static int minCreditRefresh = 30;
+   private TerminusExpiryPolicy expiryPolicy;
 
    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
                                       AMQPConnectionContext connection,
@@ -83,10 +86,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             address = sessionSPI.tempQueueName();
 
             try {
-               sessionSPI.createTemporaryQueue(address);
+               sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST);
             } catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
+            expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
             target.setAddress(address);
          } else {
             //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
@@ -165,6 +169,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       protonSession.removeReceiver(receiver);
+      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+      if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+         try {
+            sessionSPI.removeTemporaryQueue(target.getAddress());
+         } catch (Exception e) {
+            //ignore on close, its temp anyway and will be removed later
+         }
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 960942d..43de7c4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -19,10 +19,16 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -30,6 +36,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -66,6 +73,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
    private static final Symbol COPY = Symbol.valueOf("copy");
    private static final Symbol TOPIC = Symbol.valueOf("topic");
+   private static final Symbol QUEUE = Symbol.valueOf("queue");
+   private static final Symbol SHARED = Symbol.valueOf("shared");
+   private static final Symbol GLOBAL = Symbol.valueOf("global");
 
    private Object brokerConsumer;
 
@@ -74,7 +84,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    protected final AMQPConnectionContext connection;
    protected boolean closed = false;
    protected final AMQPSessionCallback sessionSPI;
+   private boolean multicast;
+   //todo get this from somewhere
+   private RoutingType defaultRoutingType = RoutingType.ANYCAST;
    protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
+   private RoutingType routingTypeToUse = defaultRoutingType;
+   private boolean shared = false;
+   private boolean global = false;
+   private boolean isVolatile = false;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
       super();
@@ -127,7 +144,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       super.initialise();
 
       Source source = (Source) sender.getRemoteSource();
-      String queue;
+      String queue = null;
       String selector = null;
       final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
@@ -148,32 +165,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
       }
 
-      // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this
-      // address to act like a topic then act like a subscription.
-      boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
-
-      if (isPubSub) {
-         Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
-         if (filter != null) {
-            String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
-            String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
-            if (selector != null) {
-               selector += " AND " + noLocalFilter;
-            } else {
-               selector = noLocalFilter;
-            }
-
-            supportedFilters.put(filter.getKey(), filter.getValue());
-         }
-      }
-
       if (source == null) {
          // Attempt to recover a previous subscription happens when a link reattach happens on a
          // subscription queue
          String clientId = getClientId();
          String pubId = sender.getName();
-         queue = createQueueName(clientId, pubId);
-         QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+         queue = createQueueName(clientId, pubId, true, global, false);
+         QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
+         multicast = true;
+         routingTypeToUse = RoutingType.MULTICAST;
 
          // Once confirmed that the address exists we need to return a Source that reflects
          // the lifetime policy and capabilities of the new subscription.
@@ -222,23 +222,86 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // node is temporary and  will be deleted on closing of the session
          queue = java.util.UUID.randomUUID().toString();
          try {
-            sessionSPI.createTemporaryQueue(queue);
+            sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
             // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
          } catch (Exception e) {
             throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
          }
          source.setAddress(queue);
       } else {
+         SimpleString addressToUse;
+         SimpleString queueNameToUse = null;
+         shared = hasCapabilities(SHARED, source);
+         global = hasCapabilities(GLOBAL, source);
+
+         //find out if we have an address made up of the address and queue name, if yes then set queue name
+         if (CompositeAddress.isFullyQualified(source.getAddress())) {
+            CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress());
+            addressToUse = new SimpleString(compositeAddress.getAddress());
+            queueNameToUse = new SimpleString(compositeAddress.getQueueName());
+         } else {
+            addressToUse = new SimpleString(source.getAddress());
+         }
+         //check to see if the client has defined how we act
+         boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
+         if (clientDefined)  {
+            multicast = hasCapabilities(TOPIC, source);
+            AddressInfo addressInfo = sessionSPI.getAddress(addressToUse);
+            Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
+            //if the client defines 1 routing type and the broker another then throw an exception
+            if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
+               throw new ActiveMQAMQPIllegalStateException("Address is not configured for topic support");
+            } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
+               throw new ActiveMQAMQPIllegalStateException("Address is not configured for queue support");
+            }
+         } else {
+            //if not we look up the address
+            AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
+            if (!addressQueryResult.isExists()) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+            }
+
+            Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();
+            if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
+               multicast = true;
+            } else {
+               //todo add some checks if both routing types are supported
+               multicast = false;
+            }
+         }
+         routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
          // if not dynamic then we use the target's address as the address to forward the
          // messages to, however there has to be a queue bound to it so we need to check this.
-         if (isPubSub) {
-            // if we are a subscription and durable create a durable queue using the container
-            // id and link name
-            if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+         if (multicast) {
+            Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
+            if (filter != null) {
+               String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+               String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+               if (selector != null) {
+                  selector += " AND " + noLocalFilter;
+               } else {
+                  selector = noLocalFilter;
+               }
+
+               supportedFilters.put(filter.getKey(), filter.getValue());
+            }
+
+
+            if (queueNameToUse != null) {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST  );
+               queue = matchingAnycastQueue.toString();
+            }
+            //if the address specifies a broker configured queue then we always use this, treat it as a queue
+            if (queue != null) {
+               multicast = false;
+            } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+
+               // if we are a subscription and durable create a durable queue using the container
+               // id and link name
                String clientId = getClientId();
                String pubId = sender.getName();
-               queue = createQueueName(clientId, pubId);
-               QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+               queue = createQueueName(clientId, pubId, shared, global, false);
+               QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
 
                if (result.isExists()) {
                   // If a client reattaches to a durable subscription with a different no-local
@@ -248,25 +311,54 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
                      if (result.getConsumerCount() == 0) {
                         sessionSPI.deleteQueue(queue);
-                        sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                        sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                      } else {
                         throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                      }
                   }
                } else {
-                  sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                  if (shared) {
+                     sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  } else {
+                     sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  }
                }
             } else {
                // otherwise we are a volatile subscription
-               queue = java.util.UUID.randomUUID().toString();
-               try {
-                  sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
-               } catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+               isVolatile = true;
+               if (shared && sender.getName() != null) {
+                  queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile);
+                  try {
+                     sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  } catch (ActiveMQQueueExistsException e) {
+                     //this is ok, just means its shared
+                  }
+               } else {
+                  queue = java.util.UUID.randomUUID().toString();
+                  try {
+                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
+                  } catch (Exception e) {
+                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+                  }
                }
             }
          } else {
-            queue = source.getAddress();
+            if (queueNameToUse != null) {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
+               if (matchingAnycastQueue != null) {
+                  queue = matchingAnycastQueue.toString();
+               } else {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+               }
+            } else {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
+               if (matchingAnycastQueue != null) {
+                  queue = matchingAnycastQueue.toString();
+               } else {
+                  queue = addressToUse.toString();
+               }
+            }
+
          }
 
          if (queue == null) {
@@ -274,7 +366,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
 
          try {
-            if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
+            if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
             }
          } catch (ActiveMQAMQPNotFoundException e) {
@@ -290,9 +382,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       // have not honored what it asked for.
       source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
 
-      boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+      boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
-         brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
+         brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+      } catch (ActiveMQAMQPResourceLimitExceededException e1) {
+         throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (Exception e) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
       }
@@ -302,10 +396,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return connection.getRemoteContainer();
    }
 
-   private boolean isPubSub(Source source) {
-      String pubSubPrefix = sessionSPI.getPubSubPrefix();
-      return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
-   }
 
    /*
     * close the session
@@ -341,23 +431,30 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // any durable resources for say pub subs
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
-            if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) {
+            if (source != null && source.getAddress() != null && multicast) {
                String queueName = source.getAddress();
-               QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
+               QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse,  false);
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
                } else {
                   String clientId = getClientId();
                   String pubId = sender.getName();
-                  String queue = createQueueName(clientId, pubId);
-                  result = sessionSPI.queueQuery(queue, false);
-                  if (result.isExists()) {
-                     if (result.getConsumerCount() > 0) {
-                        System.out.println("error");
-                     }
+                  if (pubId.contains("|")) {
+                     pubId = pubId.split("\\|")[0];
+                  }
+                  String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
+                  result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
+                  //only delete if it isn't volatile and has no consumers
+                  if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
                      sessionSPI.deleteQueue(queue);
                   }
                }
+            } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+               try {
+                  sessionSPI.removeTemporaryQueue(source.getAddress());
+               } catch (Exception e) {
+                  //ignore on close, its temp anyway and will be removed later
+               }
             }
          }
       } catch (Exception e) {
@@ -521,7 +618,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return false;
    }
 
-   private static String createQueueName(String clientId, String pubId) {
-      return clientId + "." + pubId;
+   private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
+      String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
+      if (shared) {
+         if (queue.contains("|")) {
+            queue = queue.split("\\|")[0];
+         }
+         if (isVolatile) {
+            queue += ":shared-volatile";
+         }
+         if (global) {
+            queue += ":global";
+         }
+      }
+      return queue;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
index 6325ff6..931efa7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
@@ -22,7 +22,7 @@ import org.apache.qpid.proton.engine.Connection;
 
 public class ExtCapability {
 
-   public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY};
+   public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY, AmqpSupport.SHARED_SUBS};
 
    public static Symbol[] getCapabilities() {
       return capabilities;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
index db23f56..be14056 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.rest.test;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.jboss.resteasy.client.ClientRequest;
 import org.jboss.resteasy.client.ClientResponse;
 import org.jboss.resteasy.spi.Link;
@@ -30,6 +31,7 @@ public class FindDestinationTest extends MessageTestBase {
    @Test
    public void testFindQueue() throws Exception {
       String testName = "testFindQueue";
+      server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST));
       server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false);
 
       ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName));
@@ -60,6 +62,7 @@ public class FindDestinationTest extends MessageTestBase {
 
    @Test
    public void testFindTopic() throws Exception {
+      server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST));
       server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false);
       ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic"));
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 4464062..841aa84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -619,7 +619,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       }
    }
 
-   @Deprecated
    @Override
    public void createQueue(final String address, final String name) throws Exception {
       checkStarted();
@@ -633,6 +632,18 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
+   public void createQueue(final String address, final String name, final String routingType) throws Exception {
+      checkStarted();
+
+      clearIO();
+      try {
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, true, false);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public void createQueue(final String address, final String name, final boolean durable) throws Exception {
       checkStarted();
 
@@ -645,35 +656,44 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
-   public void createQueue(String address,
-                           String routingType,
-                           String name,
-                           String filterStr,
-                           boolean durable,
-                           int maxConsumers,
-                           boolean deleteOnNoConsumers,
-                           boolean autoCreateAddress) throws Exception {
+   public void createQueue(final String address, final String name, final boolean durable, final String routingType) throws Exception {
       checkStarted();
 
       clearIO();
+      try {
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, durable, false);
+      } finally {
+         blockOnIO();
+      }
+   }
 
-      SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+   @Override
+   public void createQueue(final String address,
+                           final String name,
+                           final String filterStr,
+                           final boolean durable) throws Exception {
+      checkStarted();
+
+      clearIO();
       try {
+         SimpleString filter = null;
          if (filterStr != null && !filterStr.trim().equals("")) {
             filter = new SimpleString(filterStr);
          }
 
-         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+         server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
       } finally {
          blockOnIO();
       }
    }
 
+
    @Override
    public void createQueue(final String address,
                            final String name,
                            final String filterStr,
-                           final boolean durable) throws Exception {
+                           final boolean durable,
+                           final String routingType) throws Exception {
       checkStarted();
 
       clearIO();
@@ -683,12 +703,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public void createQueue(String address,
+                           String routingType,
+                           String name,
+                           String filterStr,
+                           boolean durable,
+                           int maxConsumers,
+                           boolean deleteOnNoConsumers,
+                           boolean autoCreateAddress) throws Exception {
+      checkStarted();
+
+      clearIO();
+
+      SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+      try {
+         if (filterStr != null && !filterStr.trim().equals("")) {
+            filter = new SimpleString(filterStr);
+         }
+
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
       } finally {
          blockOnIO();
       }
    }
 
+
    @Override
    public String[] getQueueNames() {
       checkStarted();
@@ -1704,30 +1750,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
          settings.add("expiryAddress", addressSettings.getExpiryAddress().toString());
       }
       return settings.add("expiryDelay", addressSettings.getExpiryDelay())
-                     .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
-                     .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
-                     .add("maxSizeBytes", addressSettings.getMaxSizeBytes())
-                     .add("pageSizeBytes", addressSettings.getPageSizeBytes())
-                     .add("redeliveryDelay", addressSettings.getRedeliveryDelay())
-                     .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
-                     .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
-                     .add("redistributionDelay", addressSettings.getRedistributionDelay())
-                     .add("lastValueQueue", addressSettings.isLastValueQueue())
-                     .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
-                     .add("addressFullMessagePolicy", policy)
-                     .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
-                     .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
-                     .add("slowConsumerPolicy", consumerPolicy)
-                     .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
-                     .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
-                     .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
-                     .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
-                     .add("autoCreateQueues", addressSettings.isAutoCreateQueues())
-                     .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
-                     .add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
-                     .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
-                     .build()
-                     .toString();
+            .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
+            .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
+            .add("maxSizeBytes", addressSettings.getMaxSizeBytes())
+            .add("pageSizeBytes", addressSettings.getPageSizeBytes())
+            .add("redeliveryDelay", addressSettings.getRedeliveryDelay())
+            .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
+            .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
+            .add("redistributionDelay", addressSettings.getRedistributionDelay())
+            .add("lastValueQueue", addressSettings.isLastValueQueue())
+            .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
+            .add("addressFullMessagePolicy", policy)
+            .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
+            .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
+            .add("slowConsumerPolicy", consumerPolicy)
+            .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
+            .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
+            .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
+            .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
+            .add("autoCreateQueues", addressSettings.isAutoCreateQueues())
+            .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
+            .add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
+            .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
+            .build()
+            .toString();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index 1cf1a07..ada1d77 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -44,6 +45,10 @@ public interface AddressManager {
 
    Bindings getMatchingBindings(SimpleString address) throws Exception;
 
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
    void clear();
 
    Binding getBinding(SimpleString queueName);
@@ -59,4 +64,5 @@ public interface AddressManager {
    AddressInfo removeAddressInfo(SimpleString address);
 
    AddressInfo getAddressInfo(SimpleString address);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 48ec7db..dc5f4b4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -79,6 +80,10 @@ public interface PostOffice extends ActiveMQComponent {
 
    Map<SimpleString, Binding> getAllBindings();
 
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
    RoutingStatus route(ServerMessage message, boolean direct) throws Exception;
 
    RoutingStatus route(ServerMessage message,
@@ -119,6 +124,4 @@ public interface PostOffice extends ActiveMQComponent {
    boolean isAddressBound(final SimpleString address) throws Exception;
 
    Set<SimpleString> getAddresses();
-
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
new file mode 100644
index 0000000..32083a5
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+public class CompositeAddress {
+
+   public static String SEPARATOR = "::";
+   private final String address;
+   private final String queueName;
+
+   public String getAddress() {
+      return address;
+   }
+
+   public String getQueueName() {
+      return queueName;
+   }
+
+   public CompositeAddress(String address, String queueName) {
+
+      this.address = address;
+      this.queueName = queueName;
+   }
+
+   public static boolean isFullyQualified(String address) {
+      return address.toString().contains(SEPARATOR);
+   }
+
+   public static CompositeAddress getQueueName(String address) {
+      String[] split = address.split(SEPARATOR);
+      if (split.length <= 0) {
+         throw new IllegalStateException("Nott A Fully Qualified Name");
+      }
+      return new CompositeAddress(split[0], split[1]);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9bd69d1..34e966c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -866,6 +867,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return addressManager.getMatchingQueue(address, routingType);
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return addressManager.getMatchingQueue(address, queueName, routingType);
+   }
+
+   @Override
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception {
       // We send direct to the queue so we can send it to the same queue that is bound to the notifications address -
       // this is crucial for ensuring

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 8db4f6f..c0e5b2d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -119,6 +119,37 @@ public class SimpleAddressManager implements AddressManager {
    }
 
    @Override
+   public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {
+
+      Binding binding = nameMap.get(address);
+
+      if (binding == null || !(binding instanceof  LocalQueueBinding)
+            || !binding.getAddress().equals(address)) {
+         Bindings bindings = mappings.get(address);
+         if (bindings != null) {
+            for (Binding theBinding : bindings.getBindings()) {
+               if (theBinding instanceof LocalQueueBinding) {
+                  binding = theBinding;
+                  break;
+               }
+            }
+         }
+      }
+
+      return binding != null ? binding.getUniqueName() : null;
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception {
+      Binding binding = nameMap.get(queueName);
+
+      if (binding != null && !binding.getAddress().equals(address)) {
+         throw new IllegalStateException("queue belongs to address" + binding.getAddress());
+      }
+      return binding != null ? binding.getUniqueName() : null;
+   }
+
+   @Override
    public void clear() {
       nameMap.clear();
       mappings.clear();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 0aeaf6b..5fe71bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -344,6 +344,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    QueueQueryResult queueQuery(SimpleString name) throws Exception;
 
+   AddressQueryResult addressQuery(SimpleString name) throws Exception;
+
    Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
                      boolean durable,
                      boolean temporary,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index badadf4..e6b5ad4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -163,6 +163,8 @@ public interface ServerSession extends SecurityAuth {
 
    QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
 
+   AddressQueryResult executeAddressQuery(SimpleString name) throws Exception;
+
    BindingQueryResult executeBindingQuery(SimpleString address) throws Exception;
 
    void closeConsumer(long consumerID) throws Exception;
@@ -237,4 +239,10 @@ public interface ServerSession extends SecurityAuth {
    List<MessageReference> getInTXMessagesForConsumer(long consumerId);
 
    String getValidatedUser();
+
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
+   AddressInfo getAddress(SimpleString address);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 2ae2329..c997dab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -720,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
                } else {
                   // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
                   // actually routed to at that address though
-                  queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
+                  queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false, -1, false, true);
                }
 
                // There are a few things that will behave differently when it's an internal queue

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 425eee5..742de33 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -108,6 +108,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Divert;
@@ -748,6 +749,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public AddressQueryResult addressQuery(SimpleString name) throws Exception {
+      if (name == null) {
+         throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
+      }
+
+      boolean autoCreateAddresses = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateAddresses();
+
+      AddressInfo addressInfo = postOffice.getAddressInfo(name);
+      AddressQueryResult response;
+      if (addressInfo != null) {
+         response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses);
+      } else {
+         response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses);
+      }
+      return response;
+   }
+
+   @Override
    public void threadDump() {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
@@ -1468,7 +1487,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final SimpleString filterString,
                             final boolean durable,
                             final boolean temporary) throws Exception {
-      return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
+      return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 79600b9..347874b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -708,6 +709,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception {
+      return server.addressQuery(name);
+   }
+
+   @Override
    public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
       return server.bindingQuery(address);
    }
@@ -1484,6 +1490,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return server.getPostOffice().getMatchingQueue(address, routingType);
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return server.getPostOffice().getMatchingQueue(address, queueName, routingType);
+   }
+
+   @Override
+   public AddressInfo getAddress(SimpleString address) {
+      return server.getPostOffice().getAddressInfo(address);
+   }
+
+   @Override
    public String toString() {
       StringBuffer buffer = new StringBuffer();
       if (this.metaData != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index 56353e4..fddaf9d 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -76,6 +76,44 @@ public class AmqpClient {
    }
 
    /**
+    * Creates a connection with the broker at the given location, this method initiates a
+    * connect attempt immediately and will fail if the remote peer cannot be reached.
+    *
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @return a new connection object used to interact with the connected peer.
+    */
+   public AmqpConnection connect(boolean noContainerId) throws Exception {
+
+      AmqpConnection connection = createConnection();
+      connection.setNoContainerID();
+
+      LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+      connection.connect();
+
+      return connection;
+   }
+
+
+   /**
+    * Creates a connection with the broker at the given location, this method initiates a
+    * connect attempt immediately and will fail if the remote peer cannot be reached.
+    *
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @return a new connection object used to interact with the connected peer.
+    */
+   public AmqpConnection connect(String containerId) throws Exception {
+
+      AmqpConnection connection = createConnection();
+      connection.setContainerId(containerId);
+
+      LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+      connection.connect();
+
+      return connection;
+   }
+
+
+   /**
     * Creates a connection object using the configured values for user, password, remote URI
     * etc.  This method does not immediately initiate a connection to the remote leaving that
     * to the caller which provides a connection object that can have additional configuration

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 01c60bc..723daef 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -104,6 +104,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
    private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
    private boolean trace;
+   private boolean noContainerID = false;
 
    public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
                          String username,
@@ -139,7 +140,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
          serializer.execute(new Runnable() {
             @Override
             public void run() {
-               getEndpoint().setContainer(safeGetContainerId());
+               if (!noContainerID) {
+                  getEndpoint().setContainer(safeGetContainerId());
+               }
                getEndpoint().setHostname(remoteURI.getHost());
                if (!getOfferedCapabilities().isEmpty()) {
                   getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
@@ -735,4 +738,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    public String toString() {
       return "AmqpConnection { " + connectionId + " }";
    }
+
+   public void setNoContainerID() {
+      noContainerID = true;
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index fc3fdf7..d4b16c1 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -271,7 +271,68 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       checkClosed();
 
       final ClientFuture request = new ClientFuture();
-      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create the receiver link.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createMulticastReceiver(Source source, String receiverId, String receiveName) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
+      receiver.setSubscriptionName(receiveName);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create the receiver link.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createMulticastReceiver(String receiverId, String address, String receiveName) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, receiverId);
+      receiver.setSubscriptionName(receiveName);
 
       connection.getScheduler().execute(new Runnable() {
 


Mime
View raw message