activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject activemq-artemis git commit: add consuming support to AMQP for new addressing schema
Date Sat, 03 Dec 2016 09:07:12 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 96b939b3b -> b59ddaa61


add consuming support to AMQP for new addressing schema


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b59ddaa6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b59ddaa6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b59ddaa6

Branch: refs/heads/ARTEMIS-780
Commit: b59ddaa61ef8cff891bf4032e2cf89f4a3cdadc8
Parents: 96b939b
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Sat Dec 3 09:03:43 2016 +0000
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Sat Dec 3 09:06:46 2016 +0000

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  23 ++-
 .../proton/ProtonServerReceiverContext.java     |   3 +-
 .../amqp/proton/ProtonServerSenderContext.java  | 115 ++++++++---
 .../artemis/core/postoffice/AddressManager.java |   6 +
 .../artemis/core/postoffice/PostOffice.java     |   7 +-
 .../core/postoffice/impl/CompositeAddress.java  |  53 +++++
 .../core/postoffice/impl/PostOfficeImpl.java    |  11 +
 .../postoffice/impl/SimpleAddressManager.java   |  27 +++
 .../artemis/core/server/ServerSession.java      |   6 +
 .../core/server/impl/ServerSessionImpl.java     |  15 ++
 .../transport/amqp/client/AmqpSession.java      |   2 +-
 .../integration/amqp/AmqpClientTestSupport.java |   9 +
 .../amqp/AmqpDurableReceiverTest.java           |   2 +-
 .../amqp/AmqpTempDestinationTest.java           |   2 +-
 .../integration/amqp/AmqpTransactionTest.java   |   5 -
 .../amqp/BrokerDefinedAnycastConsumerTest.java  | 201 +++++++++++++++++++
 .../BrokerDefinedMulticastConsumerTest.java     | 112 +++++++++++
 .../tests/integration/amqp/ProtonTest.java      |  58 ++++--
 .../amqp/SendingAndReceivingTest.java           |   4 +
 .../core/server/impl/fakes/FakePostOffice.java  |  12 ++
 20 files changed, 607 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6382cb2..5f5569f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -31,9 +31,11 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
@@ -192,12 +194,12 @@ public class AMQPSessionCallback implements SessionCallback {
       serverConsumer.receiveCredits(-1);
    }
 
-   public void createTemporaryQueue(String queueName) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
+   public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
    }
 
-   public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
+   public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName),  routingType, SimpleString.toSimpleString(filter), true, false);
    }
 
    public void createDurableQueue(String address, String queueName, String filter) throws Exception {
@@ -521,4 +523,17 @@ public class AMQPSessionCallback implements SessionCallback {
       protonSPI.removeTransaction(txid);
 
    }
+
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return serverSession.getMatchingQueue(address, routingType);
+   }
+
+
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return serverSession.getMatchingQueue(address, queueName, routingType);
+   }
+
+   public AddressInfo getAddress(SimpleString address) {
+      return  serverSession.getAddress(address);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 41caea9..515acc3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -83,7 +84,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             address = sessionSPI.tempQueueName();
 
             try {
-               sessionSPI.createTemporaryQueue(address);
+               sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST);
             } catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ef075fc..b998b25 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -19,9 +19,13 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -65,6 +69,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
    private static final Symbol COPY = Symbol.valueOf("copy");
    private static final Symbol TOPIC = Symbol.valueOf("topic");
+   private static final Symbol QUEUE = Symbol.valueOf("queue");
 
    private Object brokerConsumer;
 
@@ -73,6 +78,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    protected final AMQPConnectionContext connection;
    protected boolean closed = false;
    protected final AMQPSessionCallback sessionSPI;
+   private boolean multicast;
    protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
 
    public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
@@ -126,7 +132,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       super.initialise();
 
       Source source = (Source) sender.getRemoteSource();
-      String queue;
+      String queue = null;
       String selector = null;
       final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
@@ -148,25 +154,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
       }
 
-      // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this
-      // address to act like a topic then act like a subscription.
-      boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
-
-      if (isPubSub) {
-         Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
-         if (filter != null) {
-            String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
-            String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
-            if (selector != null) {
-               selector += " AND " + noLocalFilter;
-            } else {
-               selector = noLocalFilter;
-            }
-
-            supportedFilters.put(filter.getKey(), filter.getValue());
-         }
-      }
-
       if (source == null) {
          // Attempt to recover a previous subscription happens when a link reattach happens on a
          // subscription queue
@@ -222,19 +209,77 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // node is temporary and  will be deleted on closing of the session
          queue = java.util.UUID.randomUUID().toString();
          try {
-            sessionSPI.createTemporaryQueue(queue);
+            sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
             // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
          } catch (Exception e) {
             throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
          }
          source.setAddress(queue);
       } else {
+         SimpleString addressToUse;
+         SimpleString queueNameToUse = null;
+         //find out if we have an address made up of the address and queue name, if yes then set queue name
+         if (CompositeAddress.isFullyQualified(source.getAddress())) {
+            CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress());
+            addressToUse = new SimpleString(compositeAddress.getAddress());
+            queueNameToUse = new SimpleString(compositeAddress.getQueueName());
+         }
+         else {
+            addressToUse = new SimpleString(source.getAddress());
+         }
+         //check to see if the client has defined how we act
+         boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
+         if (clientDefined)  {
+            multicast = hasCapabilities(TOPIC, source);
+            AddressInfo addressInfo = sessionSPI.getAddress(addressToUse);
+            Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
+            //if the client defines 1 routing type and the broker another then throw an exception
+            if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
+               throw new ActiveMQAMQPIllegalStateException("Address is not configured for topic support");
+            }
+            else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
+               throw new ActiveMQAMQPIllegalStateException("Address is not configured for queue support");
+            }
+         }
+         else {
+            //if not we look up the address
+            AddressInfo addressInfo = sessionSPI.getAddress(addressToUse);
+            Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
+            if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
+               multicast = true;
+            }
+            else {
+               multicast = false;
+            }
+         }
          // if not dynamic then we use the target's address as the address to forward the
          // messages to, however there has to be a queue bound to it so we need to check this.
-         if (isPubSub) {
+         if (multicast) {
+            Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
+            if (filter != null) {
+               String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+               String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+               if (selector != null) {
+                  selector += " AND " + noLocalFilter;
+               } else {
+                  selector = noLocalFilter;
+               }
+
+               supportedFilters.put(filter.getKey(), filter.getValue());
+            }
+
+
+            if (queueNameToUse != null) {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST  );
+               queue = matchingAnycastQueue.toString();
+            }
+            //if the address specifies a broker configured queue then we always use this, treat it as a queue
+            if (queue != null) {
+               multicast = false;
+            }
             // if we are a subscription and durable create a durable queue using the container
             // id and link name
-            if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+            else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
                String clientId = getClientId();
                String pubId = sender.getName();
                queue = createQueueName(clientId, pubId);
@@ -260,13 +305,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                // otherwise we are a volatile subscription
                queue = java.util.UUID.randomUUID().toString();
                try {
-                  sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
+                  sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
                } catch (Exception e) {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                }
             }
          } else {
-            queue = source.getAddress();
+            if (queueNameToUse != null) {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
+               queue = matchingAnycastQueue.toString();
+            }
+            else {
+               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
+               queue = matchingAnycastQueue.toString();
+            }
+
          }
 
          if (queue == null) {
@@ -274,7 +327,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
 
          try {
-            if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
+            if (!sessionSPI.queueQuery(queue, !multicast).isExists()) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
             }
          } catch (ActiveMQAMQPNotFoundException e) {
@@ -290,9 +343,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       // have not honored what it asked for.
       source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
 
-      boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+      boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
-         brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
+         brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
       } catch (Exception e) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
       }
@@ -302,10 +355,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return connection.getRemoteContainer();
    }
 
-   private boolean isPubSub(Source source) {
-      String pubSubPrefix = sessionSPI.getPubSubPrefix();
-      return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
-   }
 
    /*
     * close the session
@@ -338,7 +387,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // any durable resources for say pub subs
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
-            if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) {
+            if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || multicast)) {
                String queueName = source.getAddress();
                QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
                if (result.isExists() && source.getDynamic()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index 1cf1a07..ada1d77 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -44,6 +45,10 @@ public interface AddressManager {
 
    Bindings getMatchingBindings(SimpleString address) throws Exception;
 
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
    void clear();
 
    Binding getBinding(SimpleString queueName);
@@ -59,4 +64,5 @@ public interface AddressManager {
    AddressInfo removeAddressInfo(SimpleString address);
 
    AddressInfo getAddressInfo(SimpleString address);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 48ec7db..dc5f4b4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -79,6 +80,10 @@ public interface PostOffice extends ActiveMQComponent {
 
    Map<SimpleString, Binding> getAllBindings();
 
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
    RoutingStatus route(ServerMessage message, boolean direct) throws Exception;
 
    RoutingStatus route(ServerMessage message,
@@ -119,6 +124,4 @@ public interface PostOffice extends ActiveMQComponent {
    boolean isAddressBound(final SimpleString address) throws Exception;
 
    Set<SimpleString> getAddresses();
-
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
new file mode 100644
index 0000000..bc12fb7
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+public class CompositeAddress {
+
+   public static String SEPARATOR ="::";
+   private final String address;
+   private final String queueName;
+
+   public String getAddress() {
+      return address;
+   }
+
+   public String getQueueName() {
+      return queueName;
+   }
+
+   public CompositeAddress(String address, String queueName) {
+
+      this.address = address;
+      this.queueName = queueName;
+   }
+
+   public static boolean isFullyQualified(String address) {
+      return address.toString().contains(SEPARATOR);
+   }
+
+   public static CompositeAddress getQueueName(String address) {
+      String[] split = address.split(SEPARATOR);
+      if(split.length <= 0) {
+         throw new IllegalStateException("Nott A Fully Qualified Name");
+      }
+      return new CompositeAddress(split[0], split[1]);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9bd69d1..34e966c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -866,6 +867,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return addressManager.getMatchingQueue(address, routingType);
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return addressManager.getMatchingQueue(address, queueName, routingType);
+   }
+
+   @Override
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception {
       // We send direct to the queue so we can send it to the same queue that is bound to the notifications address -
       // this is crucial for ensuring

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 8db4f6f..e39626f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -118,6 +118,33 @@ public class SimpleAddressManager implements AddressManager {
       return bindings;
    }
 
+   public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {
+
+      Binding binding = nameMap.get(address);
+
+      if (binding == null || !(binding instanceof  LocalQueueBinding)
+            || !binding.getAddress().equals(address)) {
+         Bindings bindings = mappings.get(address);
+         for (Binding theBinding : bindings.getBindings()) {
+            if (theBinding instanceof LocalQueueBinding) {
+               binding = theBinding;
+               break;
+            }
+         }
+      }
+
+      return binding != null ? binding.getUniqueName() : null;
+   }
+
+   public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception {
+      Binding binding = nameMap.get(queueName);
+
+      if (binding != null && !binding.getAddress().equals(address)) {
+         throw new IllegalStateException("queue belongs to address" + binding.getAddress());
+      }
+      return binding != null ? binding.getUniqueName() : null;
+   }
+
    @Override
    public void clear() {
       nameMap.clear();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index badadf4..a92786a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -237,4 +237,10 @@ public interface ServerSession extends SecurityAuth {
    List<MessageReference> getInTXMessagesForConsumer(long consumerId);
 
    String getValidatedUser();
+
+   SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
+
+   SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
+
+   AddressInfo getAddress(SimpleString address);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 89d110e..bee7d73 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1471,6 +1471,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
+      return server.getPostOffice().getMatchingQueue(address, routingType);
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+      return server.getPostOffice().getMatchingQueue(address, queueName, routingType);
+   }
+
+   @Override
+   public AddressInfo getAddress(SimpleString address) {
+      return server.getPostOffice().getAddressInfo(address);
+   }
+
+   @Override
    public String toString() {
       StringBuffer buffer = new StringBuffer();
       if (this.metaData != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index fc3fdf7..6ed7ed8 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -271,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       checkClosed();
 
       final ClientFuture request = new ClientFuture();
-      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
 
       connection.getScheduler().execute(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 1e12d4c..6f373e5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -22,8 +22,11 @@ import java.util.LinkedList;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.server.JMSServerManager;
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
@@ -86,6 +89,12 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
       ActiveMQServer server = createServer(true, true);
       serverManager = new JMSServerManagerImpl(server);
       Configuration serverConfig = server.getConfiguration();
+      CoreAddressConfiguration address = new CoreAddressConfiguration();
+      address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST);
+      CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
+      queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST);
+      address.getQueueConfigurations().add(queueConfig);
+      serverConfig.addAddressConfiguration(address);
       serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
       serverConfig.setSecurityEnabled(false);
       serverManager.start();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
index abc422b..1ff74ed 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -371,6 +371,6 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
    }
 
    public String getTopicName() {
-      return "topic://myTopic";
+      return "myTopic";
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
index c599f38..d7874e3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
@@ -111,7 +111,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
 
       sender.close();
 
-      Thread.sleep(200);
+      Thread.sleep(10000);
 
       queueView = getProxyToQueue(remoteTarget.getAddress());
       assertNull(queueView);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e42a718..627a3e4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -36,11 +36,6 @@ import org.junit.Test;
  */
 public class AmqpTransactionTest extends AmqpClientTestSupport {
 
-   @Before
-   public void createQueue() throws Exception {
-      server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false);
-   }
-
    @Test(timeout = 30000)
    public void testBeginAndCommitTransaction() throws Exception {
       AmqpClient client = createAmqpClient();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
new file mode 100644
index 0000000..38bb97a
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+
+public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
+
+   SimpleString address = new SimpleString("testAddress");
+   SimpleString queue1 = new SimpleString("queue1");
+   SimpleString queue2 = new SimpleString("queue2");
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+      server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+      sendMessages(1, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(address.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+      server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+      server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+      sendMessages(2, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(address.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+      assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+      server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+
+      sendMessages(1, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(address.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+      server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+      server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false);
+
+      sendMessages(1, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(address.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+      assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue2).getBindable()).getConsumerCount());
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+      server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+
+      sendMessages(1, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + queue1.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeWhenOnlyMulticast() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
+
+      sendMessages(1, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      AmqpSession session = connection.createSession();
+      Source jmsSource = createJmsSource(false);
+      jmsSource.setAddress(address.toString());
+      try {
+         session.createReceiver(jmsSource);
+         fail("should throw exception");
+      } catch (Exception e) {//ignore
+      }
+      connection.close();
+   }
+
+   private void sendMessages(int numMessages, String address) throws Exception {
+      AmqpClient client = createAmqpClient();
+            AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(address);
+      for (int i = 0; i < numMessages; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText("message-" +  i);
+         sender.send(message);
+      }
+      sender.close();
+      connection.connect();
+   }
+
+   protected Source createJmsSource(boolean topic) {
+
+      Source source = new Source();
+      // Set the capability to indicate the node type being created
+      if (!topic) {
+         source.setCapabilities(QUEUE_CAPABILITY);
+      } else {
+         source.setCapabilities(TOPIC_CAPABILITY);
+      }
+
+      return source;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
new file mode 100644
index 0000000..d5be5f2
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+
+public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
+
+   SimpleString address = new SimpleString("testAddress");
+   SimpleString queue1 = new SimpleString("queue1");
+   SimpleString queue2 = new SimpleString("queue2");
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
+      server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
+
+      sendMessages(1, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + address.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeWhenOnlyAnyicast() throws Exception {
+      server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+
+      sendMessages(1, address.toString());
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      AmqpSession session = connection.createSession();
+      Source jmsSource = createJmsSource(true);
+      jmsSource.setAddress(address.toString());
+      try {
+         session.createReceiver(jmsSource);
+         fail("should throw exception");
+      } catch (Exception e) {//ignore
+      }
+      connection.close();
+   }
+
+   private void sendMessages(int numMessages, String address) throws Exception {
+      AmqpClient client = createAmqpClient();
+            AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(address);
+      for (int i = 0; i < numMessages; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText("message-" +  i);
+         sender.send(message);
+      }
+      sender.close();
+      connection.connect();
+   }
+
+   protected Source createJmsSource(boolean topic) {
+
+      Source source = new Source();
+      // Set the capability to indicate the node type being created
+      if (!topic) {
+         source.setCapabilities(QUEUE_CAPABILITY);
+      } else {
+         source.setCapabilities(TOPIC_CAPABILITY);
+      }
+
+      return source;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index b017c31..3103af0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -70,6 +70,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
@@ -151,20 +153,31 @@ public class ProtonTest extends ProtonTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-
-      server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "3"), new SimpleString(coreAddress + "3"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "4"), new SimpleString(coreAddress + "4"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "5"), new SimpleString(coreAddress + "5"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "6"), new SimpleString(coreAddress + "6"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "7"), new SimpleString(coreAddress + "7"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "8"), new SimpleString(coreAddress + "8"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "9"), new SimpleString(coreAddress + "9"), null, true, false);
-      server.createQueue(new SimpleString(coreAddress + "10"), new SimpleString(coreAddress + "10"), null, true, false);
-      server.createQueue(new SimpleString("amqp_testtopic"), new SimpleString("amqp_testtopic"), null, true, false);
-      server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST));
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
+      server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "3"), RoutingType.ANYCAST, new SimpleString(coreAddress + "3"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "4"), RoutingType.ANYCAST, new SimpleString(coreAddress + "4"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "5"), RoutingType.ANYCAST, new SimpleString(coreAddress + "5"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "6"), RoutingType.ANYCAST, new SimpleString(coreAddress + "6"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "7"), RoutingType.ANYCAST, new SimpleString(coreAddress + "7"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false);
+      server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false);
+      server.createAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST));
+      server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false);
+     /* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
       server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false);
       server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false);
       server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false);
@@ -173,7 +186,7 @@ public class ProtonTest extends ProtonTestBase {
       server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false);
       server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
       server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
-      server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
+      server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);*/
 
       connection = createConnection();
 
@@ -769,6 +782,12 @@ public class ProtonTest extends ProtonTestBase {
 
    @Test
    public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
+      AddressSettings value = new AddressSettings();
+      value.setAutoCreateJmsQueues(false);
+      value.setAutoCreateQueues(false);
+      value.setAutoCreateAddresses(false);
+      value.setAutoCreateJmsTopics(false);
+      server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value);
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
       AmqpConnection amqpConnection = client.connect();
       AmqpSession session = amqpConnection.createSession();
@@ -784,6 +803,7 @@ public class ProtonTest extends ProtonTestBase {
       assertNotNull(expectedException);
       assertTrue(expectedException.getMessage().contains("amqp:not-found"));
       assertTrue(expectedException.getMessage().contains("target address does not exist"));
+      amqpConnection.close();
    }
 
    @Test
@@ -838,6 +858,7 @@ public class ProtonTest extends ProtonTestBase {
    @Test
    public void testClientIdIsSetInSubscriptionList() throws Exception {
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
       AmqpConnection amqpConnection = client.createConnection();
       amqpConnection.setContainerId("testClient");
       amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
@@ -866,14 +887,14 @@ public class ProtonTest extends ProtonTestBase {
 
       String queueName = "TestQueueName";
       String address = "TestAddress";
-
-      server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false);
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
+      server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
 
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
       AmqpConnection amqpConnection = client.connect();
       AmqpSession session = amqpConnection.createSession();
       AmqpSender sender = session.createSender(address);
-      AmqpReceiver receiver = session.createReceiver(queueName);
+      AmqpReceiver receiver = session.createReceiver(address);
       receiver.flow(1);
 
       AmqpMessage message = new AmqpMessage();
@@ -882,6 +903,7 @@ public class ProtonTest extends ProtonTestBase {
 
       AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS);
       assertNotNull(receivedMessage);
+      amqpConnection.close();
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
index f19b0a4..f424ea2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
@@ -25,7 +25,10 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.util.Random;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.After;
@@ -42,6 +45,7 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
       server = createServer(true, true);
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST));
       server.start();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index f2c844e..d272c02 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@@ -51,6 +52,17 @@ public class FakePostOffice implements PostOffice {
    }
 
    @Override
+   public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) {
+
+      return null;
+   }
+
+   @Override
+   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) {
+      return null;
+   }
+
+   @Override
    public void start() throws Exception {
 
    }


Mime
View raw message