activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [54/55] [abbrv] activemq-artemis git commit: add consuming support to AMQP for new addressing schema
Date Wed, 07 Dec 2016 18:18:57 GMT
add consuming support to AMQP for new addressing schema

build fix for code style issues

added address query and more amqp consumer functionality

codestyle fixes

more amqp work plus test fixes and API enhancements


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/865716de
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/865716de
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/865716de

Branch: refs/heads/ARTEMIS-780
Commit: 865716de4e11829525d3f065757f5af4e5224e52
Parents: 63324a1
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Sat Dec 3 09:03:43 2016 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Dec 7 13:16:37 2016 -0500

----------------------------------------------------------------------
 .../activemq/cli/test/FileBrokerTest.java       |  24 --
 .../core/management/ActiveMQServerControl.java  |  59 ++++
 .../artemis/core/server/AddressQueryResult.java |  66 ++++
 .../amqp/broker/AMQPSessionCallback.java        |  64 +++-
 .../protocol/amqp/proton/AmqpSupport.java       |   1 +
 .../proton/ProtonServerReceiverContext.java     |  14 +-
 .../amqp/proton/ProtonServerSenderContext.java  | 217 +++++++++---
 .../amqp/proton/handler/ExtCapability.java      |   2 +-
 .../artemis/rest/test/FindDestinationTest.java  |   3 +
 .../impl/ActiveMQServerControlImpl.java         | 120 ++++---
 .../artemis/core/postoffice/AddressManager.java |   6 +
 .../artemis/core/postoffice/PostOffice.java     |   7 +-
 .../core/postoffice/impl/CompositeAddress.java  |  50 +++
 .../core/postoffice/impl/PostOfficeImpl.java    |  11 +
 .../postoffice/impl/SimpleAddressManager.java   |  31 ++
 .../artemis/core/server/ActiveMQServer.java     |   2 +
 .../artemis/core/server/ServerSession.java      |   8 +
 .../cluster/impl/ClusterConnectionImpl.java     |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  21 +-
 .../core/server/impl/ServerSessionImpl.java     |  21 ++
 .../transport/amqp/client/AmqpClient.java       |  38 +++
 .../transport/amqp/client/AmqpConnection.java   |   9 +-
 .../transport/amqp/client/AmqpSession.java      |  63 +++-
 .../integration/amqp/AmqpClientTestSupport.java |  32 ++
 .../amqp/AmqpDurableReceiverTest.java           |   7 +-
 .../amqp/AmqpTempDestinationTest.java           |   2 -
 .../integration/amqp/AmqpTransactionTest.java   |   7 -
 .../amqp/BrokerDefinedAnycastConsumerTest.java  | 240 ++++++++++++++
 .../BrokerDefinedMulticastConsumerTest.java     | 119 +++++++
 .../amqp/ClientDefinedAnycastConsumerTest.java  |  52 +++
 .../amqp/ClientDefinedMultiConsumerTest.java    | 327 +++++++++++++++++++
 .../integration/amqp/ProtonPubSubTest.java      |   4 +
 .../tests/integration/amqp/ProtonTest.java      |  58 +++-
 .../amqp/SendingAndReceivingTest.java           |   4 +
 .../management/ActiveMQServerControlTest.java   |  87 ++---
 .../ActiveMQServerControlUsingCoreTest.java     |  15 +
 .../management/ManagementControlHelper.java     |   8 +
 .../integration/openwire/BasicOpenWireTest.java |   1 -
 .../core/server/impl/fakes/FakePostOffice.java  |  12 +
 39 files changed, 1611 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
index a50a49f..b04b540 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java
@@ -45,30 +45,6 @@ import static org.junit.Assert.fail;
 public class FileBrokerTest {
 
    @Test
-   public void startWithJMS() throws Exception {
-      ServerDTO serverDTO = new ServerDTO();
-      serverDTO.configuration = "broker.xml";
-      FileBroker broker = null;
-      try {
-         broker = new FileBroker(serverDTO, new ActiveMQJAASSecurityManager());
-         broker.start();
-         JMSServerManagerImpl jmsServerManager = (JMSServerManagerImpl) broker.getComponents().get("jms");
-         Assert.assertNotNull(jmsServerManager);
-         Assert.assertTrue(jmsServerManager.isStarted());
-         //this tells us the jms server is activated
-         Assert.assertTrue(jmsServerManager.getJMSStorageManager().isStarted());
-         ActiveMQServerImpl activeMQServer = (ActiveMQServerImpl) broker.getComponents().get("core");
-         Assert.assertNotNull(activeMQServer);
-         Assert.assertTrue(activeMQServer.isStarted());
-         Assert.assertTrue(broker.isStarted());
-      } finally {
-         if (broker != null) {
-            broker.stop();
-         }
-      }
-   }
-
-   @Test
    public void startWithoutJMS() throws Exception {
       ServerDTO serverDTO = new ServerDTO();
       serverDTO.configuration = "broker-nojms.xml";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 1797c9a..abd8e9e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -451,10 +451,29 @@ public interface ActiveMQServerControl {
     * @param address address to bind the queue to
     * @param name    name of the queue
     */
+   @Deprecated
    @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
    void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
                     @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
 
+
+   /**
+    * Create a durable queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+
    /**
     * Create a queue.
     * <br>
@@ -466,6 +485,7 @@ public interface ActiveMQServerControl {
     * @param name    name of the queue
     * @param durable whether the queue is durable
     */
+   @Deprecated
    @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
    void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
                     @Parameter(name = "name", desc = "Name of the queue") String name,
@@ -480,6 +500,24 @@ public interface ActiveMQServerControl {
     *
     * @param address address to bind the queue to
     * @param name    name of the queue
+    * @param durable whether the queue is durable
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+   /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
     * @param filter  of the queue
     * @param durable whether the queue is durable
     */
@@ -496,6 +534,27 @@ public interface ActiveMQServerControl {
     * <br>
     * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
     *
+    * @param address address to bind the queue to
+    * @param name    name of the queue
+    * @param filter  of the queue
+    * @param durable whether the queue is durable
+    * @param routingType The routing type used for this address, MULTICAST or ANYCAST
+    */
+   @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
+   void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                    @Parameter(name = "name", desc = "Name of the queue") String name,
+                    @Parameter(name = "filter", desc = "Filter of the queue") String filter,
+                    @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                    @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
+
+
+   /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
     * @param address             address to bind the queue to
     * @param routingType         the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
     * @param name                name of the queue

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
new file mode 100644
index 0000000..07d7406
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+import java.util.Set;
+
+
+public class AddressQueryResult {
+   private final SimpleString name;
+   private final Set<RoutingType> routingTypes;
+   private final long id;
+   private final boolean autoCreated;
+   private final boolean exists;
+   private final boolean autoCreateAddresses;
+
+   public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses) {
+
+      this.name = name;
+      this.routingTypes = routingTypes;
+      this.id = id;
+
+      this.autoCreated = autoCreated;
+      this.exists = exists;
+      this.autoCreateAddresses = autoCreateAddresses;
+   }
+
+   public SimpleString getName() {
+      return name;
+   }
+
+   public Set<RoutingType> getRoutingTypes() {
+      return routingTypes;
+   }
+
+   public long getId() {
+      return id;
+   }
+
+   public boolean isAutoCreated() {
+      return autoCreated;
+   }
+
+   public boolean isExists() {
+      return exists;
+   }
+
+   public boolean isAutoCreateAddresses() {
+      return autoCreateAddresses;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6d4abc4..9d69b00 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -28,12 +28,15 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
@@ -192,28 +195,40 @@ public class AMQPSessionCallback implements SessionCallback {
       serverConsumer.receiveCredits(-1);
    }
 
-   public void createTemporaryQueue(String queueName) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
+   public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
    }
 
-   public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
+   public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName),  routingType, SimpleString.toSimpleString(filter), true, false);
    }
 
-   public void createDurableQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
+   public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
    }
 
-   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
+   public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
+   }
+
+   public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
+   }
+
+   public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
       QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
 
       if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
          try {
-            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
+            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateQueues(), true, queueQueryResult.isAutoCreated(), queueQueryResult.isDeleteOnNoConsumers(), queueQueryResult.getRoutingType(), queueQueryResult.getMaxConsumers());
+         queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+      }
+
+      if (queueQueryResult.getRoutingType() != routingType) {
+         throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
       }
       return queueQueryResult;
    }
@@ -231,6 +246,20 @@ public class AMQPSessionCallback implements SessionCallback {
       return bindingQueryResult.isExists();
    }
 
+   public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
+      AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+
+      if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
+         try {
+            serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true);
+         } catch (ActiveMQQueueExistsException e) {
+            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+         }
+         addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+      }
+      return addressQueryResult;
+   }
+
    public void closeSender(final Object brokerConsumer) throws Exception {
 
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
@@ -522,4 +551,21 @@ public class AMQPSessionCallback implements SessionCallback {
       protonSPI.removeTransaction(txid);
 
    }
+
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return serverSession.getMatchingQueue(address, routingType);
+   }
+
+
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return serverSession.getMatchingQueue(address, queueName, routingType);
+   }
+
+   public AddressInfo getAddress(SimpleString address) {
+      return  serverSession.getAddress(address);
+   }
+
+   public void removeTemporaryQueue(String address) throws Exception {
+      serverSession.deleteQueue(SimpleString.toSimpleString(address));
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 7bdbd2e..ff398dc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -55,6 +55,7 @@ public class AmqpSupport {
    public static final Symbol PLATFORM = Symbol.valueOf("platform");
    public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
    public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
+   public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
 
 
    // Symbols used in configuration of newly opened links.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 0cc293a..a265836 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -27,6 +28,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
@@ -55,6 +57,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    // Used by the broker to decide when to refresh clients credit.  This is not used when client requests credit.
    private static int minCreditRefresh = 30;
+   private TerminusExpiryPolicy expiryPolicy;
 
    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
                                       AMQPConnectionContext connection,
@@ -83,10 +86,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             address = sessionSPI.tempQueueName();
 
             try {
-               sessionSPI.createTemporaryQueue(address);
+               sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST);
             } catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
+            expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
             target.setAddress(address);
          } else {
             //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
@@ -165,6 +169,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       protonSession.removeReceiver(receiver);
+      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+      if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+         try {
+            sessionSPI.removeTemporaryQueue(target.getAddress());
+         } catch (Exception e) {
+            //ignore on close, its temp anyway and will be removed later
+         }
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 960942d..43de7c4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -19,10 +19,16 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -30,6 +36,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -66,6 +73,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
    private static final Symbol COPY = Symbol.valueOf("copy");
    private static final Symbol TOPIC = Symbol.valueOf("topic");
+   private static final Symbol QUEUE = Symbol.valueOf("queue");
+   private static final Symbol SHARED = Symbol.valueOf("shared");
+   private static final Symbol GLOBAL = Symbol.valueOf("global");
 
    private Object brokerConsumer;
 
@@ -74,7 +84,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    protected final AMQPConnectionContext connection;
    protected boolean closed = false;
    protected final AMQPSessionCallback sessionSPI;
+   private boolean multicast;
+   //todo get this from somewhere
+   private RoutingType defaultRoutingType = RoutingType.ANYCAST;
    protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
+   private RoutingType routingTypeToUse = defaultRoutingType;
+   private boolean shared = false;
+   private boolean global = false;
+   private boolean isVolatile = false;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
       super();
@@ -127,7 +144,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       super.initialise();
 
       Source source = (Source) sender.getRemoteSource();
-      String queue;
+      String queue = null;
       String selector = null;
       final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
@@ -148,32 +165,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
       }
 
-      // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this
-      // address to act like a topic then act like a subscription.
-      boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
-
-      if (isPubSub) {
-         Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
-         if (filter != null) {
-            String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
-            String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
-            if (selector != null) {
-               selector += " AND " + noLocalFilter;
-            } else {
-               selector = noLocalFilter;
-            }
-
-            supportedFilters.put(filter.getKey(), filter.getValue());
-         }
-      }
-
       if (source == null) {
          // Attempt to recover a previous subscription happens when a link reattach happens on a
          // subscription queue
          String clientId = getClientId();
          String pubId = sender.getName();
-         queue = createQueueName(clientId, pubId);
-         QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+         queue = createQueueName(clientId, pubId, true, global, false);
+         QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
+         multicast = true;
+         routingTypeToUse = RoutingType.MULTICAST;
 
          // Once confirmed that the address exists we need to return a Source that reflects
          // the lifetime policy and capabilities of the new subscription.
@@ -222,23 +222,86 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // node is temporary and  will be deleted on closing of the session
          queue = java.util.UUID.randomUUID().toString();
          try {
-            sessionSPI.createTemporaryQueue(queue);
+            sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
             // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
          } catch (Exception e) {
             throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
          }
          source.setAddress(queue);
       } else {
+         SimpleString addressToUse;
+         SimpleString queueNameToUse = null;
+         shared = hasCapabilities(SHARED, source);
+         global = hasCapabilities(GLOBAL, source);
+
+         //find out if we have an address made up of the address and queue name, if yes then set queue name
+         if (CompositeAddress.isFullyQualified(source.getAddress())) {
+            CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress());
+            addressToUse = new SimpleString(compositeAddress.getAddress());
+            queueNameToUse = new SimpleString(compositeAddress.getQueueName());
+         } else {
+            addressToUse = new SimpleString(source.getAddress());
+         }
+         //check to see if the client has defined how we act
+         boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
+         if (clientDefined)  {
+            multicast = hasCapabilities(TOPIC, source);
+            AddressInfo addressInfo = sessionSPI.getAddress(addressToUse);
+            Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
+            //if the client defines 1 routing type and the broker another then throw an exception
+            if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
+               throw new ActiveMQAMQPIllegalStateException("Address is not configured for topic support");
+            } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
+               throw new ActiveMQAMQPIllegalStateException("Address is not configured for queue support");
+            }
+         } else {
+            //if not we look up the address
+            AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
+            if (!addressQueryResult.isExists()) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+            }
+
+            Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();
+            if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
+               multicast = true;
+            } else {
+               //todo add some checks if both routing types are supported
+               multicast = false;
+            }
+         }
+         routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
          // if not dynamic then we use the target's address as the address to forward the
          // messages to, however there has to be a queue bound to it so we need to check this.
-         if (isPubSub) {
-            // if we are a subscription and durable create a durable queue using the container
-            // id and link name
-            if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+         if (multicast) {
+            Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
+            if (filter != null) {
+               String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+               String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+               if (selector != null) {
+                  selector += " AND " + noLocalFilter;
+               } else {
+                  selector = noLocalFilter;
+               }
+
+               supportedFilters.put(filter.getKey(), filter.getValue());
+            }
+
+
+            if (queueNameToUse != null) {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST  );
+               queue = matchingAnycastQueue.toString();
+            }
+            //if the address specifies a broker configured queue then we always use this, treat it as a queue
+            if (queue != null) {
+               multicast = false;
+            } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+
+               // if we are a subscription and durable create a durable queue using the container
+               // id and link name
                String clientId = getClientId();
                String pubId = sender.getName();
-               queue = createQueueName(clientId, pubId);
-               QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+               queue = createQueueName(clientId, pubId, shared, global, false);
+               QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
 
                if (result.isExists()) {
                   // If a client reattaches to a durable subscription with a different no-local
@@ -248,25 +311,54 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
                      if (result.getConsumerCount() == 0) {
                         sessionSPI.deleteQueue(queue);
-                        sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                        sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                      } else {
                         throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                      }
                   }
                } else {
-                  sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                  if (shared) {
+                     sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  } else {
+                     sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  }
                }
             } else {
                // otherwise we are a volatile subscription
-               queue = java.util.UUID.randomUUID().toString();
-               try {
-                  sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
-               } catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+               isVolatile = true;
+               if (shared && sender.getName() != null) {
+                  queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile);
+                  try {
+                     sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  } catch (ActiveMQQueueExistsException e) {
+                     //this is ok, just means its shared
+                  }
+               } else {
+                  queue = java.util.UUID.randomUUID().toString();
+                  try {
+                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
+                  } catch (Exception e) {
+                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+                  }
                }
             }
          } else {
-            queue = source.getAddress();
+            if (queueNameToUse != null) {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
+               if (matchingAnycastQueue != null) {
+                  queue = matchingAnycastQueue.toString();
+               } else {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+               }
+            } else {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
+               if (matchingAnycastQueue != null) {
+                  queue = matchingAnycastQueue.toString();
+               } else {
+                  queue = addressToUse.toString();
+               }
+            }
+
          }
 
          if (queue == null) {
@@ -274,7 +366,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
 
          try {
-            if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
+            if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
             }
          } catch (ActiveMQAMQPNotFoundException e) {
@@ -290,9 +382,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       // have not honored what it asked for.
       source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
 
-      boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+      boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
-         brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
+         brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+      } catch (ActiveMQAMQPResourceLimitExceededException e1) {
+         throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (Exception e) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
       }
@@ -302,10 +396,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return connection.getRemoteContainer();
    }
 
-   private boolean isPubSub(Source source) {
-      String pubSubPrefix = sessionSPI.getPubSubPrefix();
-      return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
-   }
 
    /*
     * close the session
@@ -341,23 +431,30 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // any durable resources for say pub subs
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
-            if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) {
+            if (source != null && source.getAddress() != null && multicast) {
                String queueName = source.getAddress();
-               QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
+               QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse,  false);
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
                } else {
                   String clientId = getClientId();
                   String pubId = sender.getName();
-                  String queue = createQueueName(clientId, pubId);
-                  result = sessionSPI.queueQuery(queue, false);
-                  if (result.isExists()) {
-                     if (result.getConsumerCount() > 0) {
-                        System.out.println("error");
-                     }
+                  if (pubId.contains("|")) {
+                     pubId = pubId.split("\\|")[0];
+                  }
+                  String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
+                  result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
+                  //only delete if it isn't volatile and has no consumers
+                  if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
                      sessionSPI.deleteQueue(queue);
                   }
                }
+            } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+               try {
+                  sessionSPI.removeTemporaryQueue(source.getAddress());
+               } catch (Exception e) {
+                  //ignore on close, its temp anyway and will be removed later
+               }
             }
          }
       } catch (Exception e) {
@@ -521,7 +618,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return false;
    }
 
-   private static String createQueueName(String clientId, String pubId) {
-      return clientId + "." + pubId;
+   private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
+      String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
+      if (shared) {
+         if (queue.contains("|")) {
+            queue = queue.split("\\|")[0];
+         }
+         if (isVolatile) {
+            queue += ":shared-volatile";
+         }
+         if (global) {
+            queue += ":global";
+         }
+      }
+      return queue;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
index 6325ff6..931efa7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
@@ -22,7 +22,7 @@ import org.apache.qpid.proton.engine.Connection;
 
 public class ExtCapability {
 
-   public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY};
+   public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY, AmqpSupport.SHARED_SUBS};
 
    public static Symbol[] getCapabilities() {
       return capabilities;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
index db23f56..be14056 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.rest.test;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.jboss.resteasy.client.ClientRequest;
 import org.jboss.resteasy.client.ClientResponse;
 import org.jboss.resteasy.spi.Link;
@@ -30,6 +31,7 @@ public class FindDestinationTest extends MessageTestBase {
    @Test
    public void testFindQueue() throws Exception {
       String testName = "testFindQueue";
+      server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST));
       server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false);
 
       ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName));
@@ -60,6 +62,7 @@ public class FindDestinationTest extends MessageTestBase {
 
    @Test
    public void testFindTopic() throws Exception {
+      server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST));
       server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false);
       ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic"));
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 4464062..841aa84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -619,7 +619,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       }
    }
 
-   @Deprecated
    @Override
    public void createQueue(final String address, final String name) throws Exception {
       checkStarted();
@@ -633,6 +632,18 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
+   public void createQueue(final String address, final String name, final String routingType) throws Exception {
+      checkStarted();
+
+      clearIO();
+      try {
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, true, false);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public void createQueue(final String address, final String name, final boolean durable) throws Exception {
       checkStarted();
 
@@ -645,35 +656,44 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
-   public void createQueue(String address,
-                           String routingType,
-                           String name,
-                           String filterStr,
-                           boolean durable,
-                           int maxConsumers,
-                           boolean deleteOnNoConsumers,
-                           boolean autoCreateAddress) throws Exception {
+   public void createQueue(final String address, final String name, final boolean durable, final String routingType) throws Exception {
       checkStarted();
 
       clearIO();
+      try {
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, durable, false);
+      } finally {
+         blockOnIO();
+      }
+   }
 
-      SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+   @Override
+   public void createQueue(final String address,
+                           final String name,
+                           final String filterStr,
+                           final boolean durable) throws Exception {
+      checkStarted();
+
+      clearIO();
       try {
+         SimpleString filter = null;
          if (filterStr != null && !filterStr.trim().equals("")) {
             filter = new SimpleString(filterStr);
          }
 
-         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+         server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
       } finally {
          blockOnIO();
       }
    }
 
+
    @Override
    public void createQueue(final String address,
                            final String name,
                            final String filterStr,
-                           final boolean durable) throws Exception {
+                           final boolean durable,
+                           final String routingType) throws Exception {
       checkStarted();
 
       clearIO();
@@ -683,12 +703,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public void createQueue(String address,
+                           String routingType,
+                           String name,
+                           String filterStr,
+                           boolean durable,
+                           int maxConsumers,
+                           boolean deleteOnNoConsumers,
+                           boolean autoCreateAddress) throws Exception {
+      checkStarted();
+
+      clearIO();
+
+      SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+      try {
+         if (filterStr != null && !filterStr.trim().equals("")) {
+            filter = new SimpleString(filterStr);
+         }
+
+         server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
       } finally {
          blockOnIO();
       }
    }
 
+
    @Override
    public String[] getQueueNames() {
       checkStarted();
@@ -1704,30 +1750,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
          settings.add("expiryAddress", addressSettings.getExpiryAddress().toString());
       }
       return settings.add("expiryDelay", addressSettings.getExpiryDelay())
-                     .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
-                     .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
-                     .add("maxSizeBytes", addressSettings.getMaxSizeBytes())
-                     .add("pageSizeBytes", addressSettings.getPageSizeBytes())
-                     .add("redeliveryDelay", addressSettings.getRedeliveryDelay())
-                     .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
-                     .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
-                     .add("redistributionDelay", addressSettings.getRedistributionDelay())
-                     .add("lastValueQueue", addressSettings.isLastValueQueue())
-                     .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
-                     .add("addressFullMessagePolicy", policy)
-                     .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
-                     .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
-                     .add("slowConsumerPolicy", consumerPolicy)
-                     .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
-                     .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
-                     .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
-                     .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
-                     .add("autoCreateQueues", addressSettings.isAutoCreateQueues())
-                     .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
-                     .add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
-                     .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
-                     .build()
-                     .toString();
+            .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts())
+            .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize())
+            .add("maxSizeBytes", addressSettings.getMaxSizeBytes())
+            .add("pageSizeBytes", addressSettings.getPageSizeBytes())
+            .add("redeliveryDelay", addressSettings.getRedeliveryDelay())
+            .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier())
+            .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay())
+            .add("redistributionDelay", addressSettings.getRedistributionDelay())
+            .add("lastValueQueue", addressSettings.isLastValueQueue())
+            .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute())
+            .add("addressFullMessagePolicy", policy)
+            .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold())
+            .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod())
+            .add("slowConsumerPolicy", consumerPolicy)
+            .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues())
+            .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics())
+            .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues())
+            .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues())
+            .add("autoCreateQueues", addressSettings.isAutoCreateQueues())
+            .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues())
+            .add("autoCreateAddress", addressSettings.isAutoCreateAddresses())
+            .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses())
+            .build()
+            .toString();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index 1cf1a07..ada1d77 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -44,6 +45,10 @@ public interface AddressManager {
 
    Bindings getMatchingBindings(SimpleString address) throws Exception;
 
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
    void clear();
 
    Binding getBinding(SimpleString queueName);
@@ -59,4 +64,5 @@ public interface AddressManager {
    AddressInfo removeAddressInfo(SimpleString address);
 
    AddressInfo getAddressInfo(SimpleString address);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 48ec7db..dc5f4b4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -79,6 +80,10 @@ public interface PostOffice extends ActiveMQComponent {
 
    Map<SimpleString, Binding> getAllBindings();
 
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
    RoutingStatus route(ServerMessage message, boolean direct) throws Exception;
 
    RoutingStatus route(ServerMessage message,
@@ -119,6 +124,4 @@ public interface PostOffice extends ActiveMQComponent {
    boolean isAddressBound(final SimpleString address) throws Exception;
 
    Set<SimpleString> getAddresses();
-
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
new file mode 100644
index 0000000..32083a5
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+public class CompositeAddress {
+
+   public static String SEPARATOR = "::";
+   private final String address;
+   private final String queueName;
+
+   public String getAddress() {
+      return address;
+   }
+
+   public String getQueueName() {
+      return queueName;
+   }
+
+   public CompositeAddress(String address, String queueName) {
+
+      this.address = address;
+      this.queueName = queueName;
+   }
+
+   public static boolean isFullyQualified(String address) {
+      return address.toString().contains(SEPARATOR);
+   }
+
+   public static CompositeAddress getQueueName(String address) {
+      String[] split = address.split(SEPARATOR);
+      if (split.length <= 0) {
+         throw new IllegalStateException("Nott A Fully Qualified Name");
+      }
+      return new CompositeAddress(split[0], split[1]);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9bd69d1..34e966c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -866,6 +867,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return addressManager.getMatchingQueue(address, routingType);
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return addressManager.getMatchingQueue(address, queueName, routingType);
+   }
+
+   @Override
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception {
       // We send direct to the queue so we can send it to the same queue that is bound to the notifications address -
       // this is crucial for ensuring

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 8db4f6f..c0e5b2d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -119,6 +119,37 @@ public class SimpleAddressManager implements AddressManager {
    }
 
    @Override
+   public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {
+
+      Binding binding = nameMap.get(address);
+
+      if (binding == null || !(binding instanceof  LocalQueueBinding)
+            || !binding.getAddress().equals(address)) {
+         Bindings bindings = mappings.get(address);
+         if (bindings != null) {
+            for (Binding theBinding : bindings.getBindings()) {
+               if (theBinding instanceof LocalQueueBinding) {
+                  binding = theBinding;
+                  break;
+               }
+            }
+         }
+      }
+
+      return binding != null ? binding.getUniqueName() : null;
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception {
+      Binding binding = nameMap.get(queueName);
+
+      if (binding != null && !binding.getAddress().equals(address)) {
+         throw new IllegalStateException("queue belongs to address" + binding.getAddress());
+      }
+      return binding != null ? binding.getUniqueName() : null;
+   }
+
+   @Override
    public void clear() {
       nameMap.clear();
       mappings.clear();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 0aeaf6b..5fe71bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -344,6 +344,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    QueueQueryResult queueQuery(SimpleString name) throws Exception;
 
+   AddressQueryResult addressQuery(SimpleString name) throws Exception;
+
    Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
                      boolean durable,
                      boolean temporary,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index badadf4..e6b5ad4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -163,6 +163,8 @@ public interface ServerSession extends SecurityAuth {
 
    QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
 
+   AddressQueryResult executeAddressQuery(SimpleString name) throws Exception;
+
    BindingQueryResult executeBindingQuery(SimpleString address) throws Exception;
 
    void closeConsumer(long consumerID) throws Exception;
@@ -237,4 +239,10 @@ public interface ServerSession extends SecurityAuth {
    List<MessageReference> getInTXMessagesForConsumer(long consumerId);
 
    String getValidatedUser();
+
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
+   AddressInfo getAddress(SimpleString address);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 2ae2329..c997dab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -720,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
                } else {
                   // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
                   // actually routed to at that address though
-                  queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
+                  queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false, -1, false, true);
                }
 
                // There are a few things that will behave differently when it's an internal queue

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 425eee5..742de33 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -108,6 +108,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Divert;
@@ -748,6 +749,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public AddressQueryResult addressQuery(SimpleString name) throws Exception {
+      if (name == null) {
+         throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
+      }
+
+      boolean autoCreateAddresses = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateAddresses();
+
+      AddressInfo addressInfo = postOffice.getAddressInfo(name);
+      AddressQueryResult response;
+      if (addressInfo != null) {
+         response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses);
+      } else {
+         response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses);
+      }
+      return response;
+   }
+
+   @Override
    public void threadDump() {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
@@ -1468,7 +1487,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final SimpleString filterString,
                             final boolean durable,
                             final boolean temporary) throws Exception {
-      return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
+      return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 79600b9..347874b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -708,6 +709,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception {
+      return server.addressQuery(name);
+   }
+
+   @Override
    public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
       return server.bindingQuery(address);
    }
@@ -1484,6 +1490,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return server.getPostOffice().getMatchingQueue(address, routingType);
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return server.getPostOffice().getMatchingQueue(address, queueName, routingType);
+   }
+
+   @Override
+   public AddressInfo getAddress(SimpleString address) {
+      return server.getPostOffice().getAddressInfo(address);
+   }
+
+   @Override
    public String toString() {
       StringBuffer buffer = new StringBuffer();
       if (this.metaData != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index 56353e4..fddaf9d 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -76,6 +76,44 @@ public class AmqpClient {
    }
 
    /**
+    * Creates a connection with the broker at the given location, this method initiates a
+    * connect attempt immediately and will fail if the remote peer cannot be reached.
+    *
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @return a new connection object used to interact with the connected peer.
+    */
+   public AmqpConnection connect(boolean noContainerId) throws Exception {
+
+      AmqpConnection connection = createConnection();
+      connection.setNoContainerID();
+
+      LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+      connection.connect();
+
+      return connection;
+   }
+
+
+   /**
+    * Creates a connection with the broker at the given location, this method initiates a
+    * connect attempt immediately and will fail if the remote peer cannot be reached.
+    *
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @return a new connection object used to interact with the connected peer.
+    */
+   public AmqpConnection connect(String containerId) throws Exception {
+
+      AmqpConnection connection = createConnection();
+      connection.setContainerId(containerId);
+
+      LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+      connection.connect();
+
+      return connection;
+   }
+
+
+   /**
     * Creates a connection object using the configured values for user, password, remote URI
     * etc.  This method does not immediately initiate a connection to the remote leaving that
     * to the caller which provides a connection object that can have additional configuration

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 01c60bc..723daef 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -104,6 +104,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
    private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
    private boolean trace;
+   private boolean noContainerID = false;
 
    public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
                          String username,
@@ -139,7 +140,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
          serializer.execute(new Runnable() {
             @Override
             public void run() {
-               getEndpoint().setContainer(safeGetContainerId());
+               if (!noContainerID) {
+                  getEndpoint().setContainer(safeGetContainerId());
+               }
                getEndpoint().setHostname(remoteURI.getHost());
                if (!getOfferedCapabilities().isEmpty()) {
                   getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
@@ -735,4 +738,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    public String toString() {
       return "AmqpConnection { " + connectionId + " }";
    }
+
+   public void setNoContainerID() {
+      noContainerID = true;
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/865716de/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index fc3fdf7..d4b16c1 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -271,7 +271,68 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       checkClosed();
 
       final ClientFuture request = new ClientFuture();
-      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create the receiver link.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createMulticastReceiver(Source source, String receiverId, String receiveName) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
+      receiver.setSubscriptionName(receiveName);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create the receiver link.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createMulticastReceiver(String receiverId, String address, String receiveName) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, receiverId);
+      receiver.setSubscriptionName(receiveName);
 
       connection.getScheduler().execute(new Runnable() {
 


Mime
View raw message