activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject activemq-artemis git commit: added address query and more amqp consumer functionality
Date Mon, 05 Dec 2016 13:18:22 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 7d84b1e13 -> 83f8e6ecb


added address query and more amqp consumer functionality


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/83f8e6ec
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/83f8e6ec
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/83f8e6ec

Branch: refs/heads/ARTEMIS-780
Commit: 83f8e6ecb4dad0939b84f64db9962c1ca4320a74
Parents: 7d84b1e
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Mon Dec 5 13:16:01 2016 +0000
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Mon Dec 5 13:18:03 2016 +0000

----------------------------------------------------------------------
 .../artemis/core/server/AddressQueryResult.java |  68 ++++
 .../amqp/broker/AMQPSessionCallback.java        |  37 ++-
 .../amqp/proton/ProtonServerSenderContext.java  | 111 +++++--
 .../postoffice/impl/SimpleAddressManager.java   |  10 +-
 .../artemis/core/server/ActiveMQServer.java     |   2 +
 .../artemis/core/server/ServerSession.java      |   2 +
 .../core/server/impl/ActiveMQServerImpl.java    |  21 +-
 .../core/server/impl/ServerSessionImpl.java     |   6 +
 .../transport/amqp/client/AmqpClient.java       |  38 +++
 .../transport/amqp/client/AmqpConnection.java   |   9 +-
 .../transport/amqp/client/AmqpSession.java      |  61 ++++
 .../integration/amqp/AmqpClientTestSupport.java |  23 ++
 .../amqp/AmqpDurableReceiverTest.java           |   5 +-
 .../amqp/BrokerDefinedAnycastConsumerTest.java  |  59 +++-
 .../BrokerDefinedMulticastConsumerTest.java     |  25 +-
 .../amqp/ClientDefinedAnycastConsumerTest.java  |  59 ++++
 .../amqp/ClientDefinedMultiConsumerTest.java    | 330 +++++++++++++++++++
 .../integration/amqp/ProtonPubSubTest.java      |   4 +
 18 files changed, 814 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
new file mode 100644
index 0000000..ce74d3b
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+import java.util.Set;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class AddressQueryResult {
+   private final SimpleString name;
+   private final Set<RoutingType> routingTypes;
+   private final long id;
+   private final boolean autoCreated;
+   private final boolean exists;
+   private final boolean autoCreateAddresses;
+
+   public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses) {
+
+      this.name = name;
+      this.routingTypes = routingTypes;
+      this.id = id;
+
+      this.autoCreated = autoCreated;
+      this.exists = exists;
+      this.autoCreateAddresses = autoCreateAddresses;
+   }
+
+   public SimpleString getName() {
+      return name;
+   }
+
+   public Set<RoutingType> getRoutingTypes() {
+      return routingTypes;
+   }
+
+   public long getId() {
+      return id;
+   }
+
+   public boolean isAutoCreated() {
+      return autoCreated;
+   }
+
+   public boolean isExists() {
+      return exists;
+   }
+
+   public boolean isAutoCreateAddresses() {
+      return autoCreateAddresses;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 5f5569f..a1928be 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -202,20 +203,32 @@ public class AMQPSessionCallback implements SessionCallback {
       serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName),  routingType, SimpleString.toSimpleString(filter), true, false);
    }
 
-   public void createDurableQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
+   public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
    }
 
-   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
+   public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
+   }
+
+   public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
+   }
+
+   public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception {
       QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
 
       if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
          try {
-            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
+            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch and do nothing.
          }
-         queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateQueues(), true, queueQueryResult.isAutoCreated(), queueQueryResult.isDeleteOnNoConsumers(), queueQueryResult.getRoutingType(), queueQueryResult.getMaxConsumers());
+         queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+      }
+
+      if (queueQueryResult.getRoutingType() != routingType) {
+         throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
       }
       return queueQueryResult;
    }
@@ -233,6 +246,20 @@ public class AMQPSessionCallback implements SessionCallback {
       return bindingQueryResult.isExists();
    }
 
+   public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
+      AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+
+      if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
+         try {
+            serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true);
+         } catch (ActiveMQQueueExistsException e) {
+            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+         }
+         addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
+      }
+      return addressQueryResult;
+   }
+
    public void closeSender(final Object brokerConsumer) throws Exception {
 
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 06a6f9b..7f5f066 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -21,8 +21,10 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -33,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -70,6 +73,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private static final Symbol COPY = Symbol.valueOf("copy");
    private static final Symbol TOPIC = Symbol.valueOf("topic");
    private static final Symbol QUEUE = Symbol.valueOf("queue");
+   private static final Symbol SHARED = Symbol.valueOf("shared");
+   private static final Symbol GLOBAL = Symbol.valueOf("global");
 
    private Object brokerConsumer;
 
@@ -79,7 +84,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    protected boolean closed = false;
    protected final AMQPSessionCallback sessionSPI;
    private boolean multicast;
+   //todo get this from somewhere
+   private RoutingType defaultRoutingType = RoutingType.ANYCAST;
    protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
+   private RoutingType routingTypeToUse = defaultRoutingType;
+   private boolean shared = false;
+   private boolean global = false;
+   private boolean isVolatile = false;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
       super();
@@ -159,8 +170,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // subscription queue
          String clientId = getClientId();
          String pubId = sender.getName();
-         queue = createQueueName(clientId, pubId);
-         QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+         queue = createQueueName(clientId, pubId, true, global, false);
+         QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
+         multicast = true;
+         routingTypeToUse = RoutingType.MULTICAST;
 
          // Once confirmed that the address exists we need to return a Source that reflects
          // the lifetime policy and capabilities of the new subscription.
@@ -218,6 +231,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       } else {
          SimpleString addressToUse;
          SimpleString queueNameToUse = null;
+         shared = hasCapabilities(SHARED, source);
+         global = hasCapabilities(GLOBAL, source);
+         ;
          //find out if we have an address made up of the address and queue name, if yes then set queue name
          if (CompositeAddress.isFullyQualified(source.getAddress())) {
             CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress());
@@ -240,14 +256,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             }
          } else {
             //if not we look up the address
-            AddressInfo addressInfo = sessionSPI.getAddress(addressToUse);
-            Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
+            AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
+            if (!addressQueryResult.isExists()) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+            }
+
+            Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();
             if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
                multicast = true;
             } else {
+               //todo add some checks if both routing types are supported
                multicast = false;
             }
          }
+         routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
          // if not dynamic then we use the target's address as the address to forward the
          // messages to, however there has to be a queue bound to it so we need to check this.
          if (multicast) {
@@ -278,8 +300,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                // id and link name
                String clientId = getClientId();
                String pubId = sender.getName();
-               queue = createQueueName(clientId, pubId);
-               QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+               queue = createQueueName(clientId, pubId, shared, global, false);
+               QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
 
                if (result.isExists()) {
                   // If a client reattaches to a durable subscription with a different no-local
@@ -289,30 +311,52 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
                      if (result.getConsumerCount() == 0) {
                         sessionSPI.deleteQueue(queue);
-                        sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                        sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
                      } else {
                         throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                      }
                   }
                } else {
-                  sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                  if (shared) {
+                     sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  } else {
+                     sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  }
                }
             } else {
                // otherwise we are a volatile subscription
-               queue = java.util.UUID.randomUUID().toString();
-               try {
-                  sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
-               } catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+               isVolatile = true;
+               if (shared && sender.getName() != null) {
+                  queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile);
+                  try {
+                     sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
+                  } catch (ActiveMQQueueExistsException e) {
+                     //this is ok, just means its shared
+                  }
+               } else {
+                  queue = java.util.UUID.randomUUID().toString();
+                  try {
+                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
+                  } catch (Exception e) {
+                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+                  }
                }
             }
          } else {
             if (queueNameToUse != null) {
                SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
-               queue = matchingAnycastQueue.toString();
+               if (matchingAnycastQueue != null) {
+                  queue = matchingAnycastQueue.toString();
+               } else {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+               }
             } else {
                SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
-               queue = matchingAnycastQueue.toString();
+               if (matchingAnycastQueue != null) {
+                  queue = matchingAnycastQueue.toString();
+               } else {
+                  queue = addressToUse.toString();
+               }
             }
 
          }
@@ -322,7 +366,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
 
          try {
-            if (!sessionSPI.queueQuery(queue, !multicast).isExists()) {
+            if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
             }
          } catch (ActiveMQAMQPNotFoundException e) {
@@ -341,6 +385,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
          brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+      } catch (ActiveMQAMQPResourceLimitExceededException e1) {
+         throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (Exception e) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
       }
@@ -382,20 +428,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // any durable resources for say pub subs
          if (remoteLinkClose) {
             Source source = (Source) sender.getSource();
-            if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || multicast)) {
+            if (source != null && source.getAddress() != null && multicast) {
                String queueName = source.getAddress();
-               QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
+               QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse,  false);
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
                } else {
                   String clientId = getClientId();
                   String pubId = sender.getName();
-                  String queue = createQueueName(clientId, pubId);
-                  result = sessionSPI.queueQuery(queue, false);
-                  if (result.isExists()) {
-                     if (result.getConsumerCount() > 0) {
-                        System.out.println("error");
-                     }
+                  if (pubId.contains("|")) {
+                     pubId = pubId.split("\\|")[0];
+                  }
+                  String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
+                  result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
+                  //only delete if it isn't volatile and has no consumers
+                  if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
                      sessionSPI.deleteQueue(queue);
                   }
                }
@@ -562,7 +609,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       return false;
    }
 
-   private static String createQueueName(String clientId, String pubId) {
-      return clientId + "." + pubId;
+   private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
+      String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
+      if (shared) {
+         if (queue.contains("|")) {
+            queue = queue.split("\\|")[0];
+         }
+         if (isVolatile) {
+            queue += ":shared-volatile";
+      }
+      if (global) {
+         queue += ":global";
+      }
+      }
+      return queue;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index c5f7e21..c0e5b2d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -126,10 +126,12 @@ public class SimpleAddressManager implements AddressManager {
       if (binding == null || !(binding instanceof  LocalQueueBinding)
             || !binding.getAddress().equals(address)) {
          Bindings bindings = mappings.get(address);
-         for (Binding theBinding : bindings.getBindings()) {
-            if (theBinding instanceof LocalQueueBinding) {
-               binding = theBinding;
-               break;
+         if (bindings != null) {
+            for (Binding theBinding : bindings.getBindings()) {
+               if (theBinding instanceof LocalQueueBinding) {
+                  binding = theBinding;
+                  break;
+               }
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 443ced7..44e81a8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -336,6 +336,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    QueueQueryResult queueQuery(SimpleString name) throws Exception;
 
+   AddressQueryResult addressQuery(SimpleString name) throws Exception;
+
    Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
                      boolean durable,
                      boolean temporary,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index a92786a..e6b5ad4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -163,6 +163,8 @@ public interface ServerSession extends SecurityAuth {
 
    QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
 
+   AddressQueryResult executeAddressQuery(SimpleString name) throws Exception;
+
    BindingQueryResult executeBindingQuery(SimpleString address) throws Exception;
 
    void closeConsumer(long consumerID) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index aebcb9a..663539d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -107,6 +107,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Divert;
@@ -680,6 +681,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public AddressQueryResult addressQuery(SimpleString name) throws Exception {
+      if (name == null) {
+         throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
+      }
+
+      boolean autoCreateAddresses = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateAddresses();
+
+      AddressInfo addressInfo = postOffice.getAddressInfo(name);
+      AddressQueryResult response;
+      if (addressInfo != null) {
+         response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses);
+      } else {
+         response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses);
+      }
+      return response;
+   }
+
+   @Override
    public void threadDump() {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
@@ -1398,7 +1417,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final SimpleString filterString,
                             final boolean durable,
                             final boolean temporary) throws Exception {
-      return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
+      return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index bee7d73..1250b23 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -695,6 +696,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception {
+      return server.addressQuery(name);
+   }
+
+   @Override
    public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
       return server.bindingQuery(address);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index 56353e4..b537639 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -76,6 +76,44 @@ public class AmqpClient {
    }
 
    /**
+    * Creates a connection with the broker at the given location, this method initiates a
+    * connect attempt immediately and will fail if the remote peer cannot be reached.
+    *
+    * @throws Exception if an error occurs attempting to connect to the Broker.
+    * @return a new connection object used to interact with the connected peer.
+    */
+   public AmqpConnection connect(boolean noContainerId) throws Exception {
+
+      AmqpConnection connection = createConnection();
+      connection.setNoContainerID();
+
+      LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+      connection.connect();
+
+      return connection;
+   }
+
+
+   /**
+       * Creates a connection with the broker at the given location, this method initiates a
+       * connect attempt immediately and will fail if the remote peer cannot be reached.
+       *
+       * @throws Exception if an error occurs attempting to connect to the Broker.
+       * @return a new connection object used to interact with the connected peer.
+       */
+      public AmqpConnection connect(String containerId) throws Exception {
+
+         AmqpConnection connection = createConnection();
+         connection.setContainerId(containerId);
+
+         LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+         connection.connect();
+
+         return connection;
+      }
+
+
+   /**
     * Creates a connection object using the configured values for user, password, remote URI
     * etc.  This method does not immediately initiate a connection to the remote leaving that
     * to the caller which provides a connection object that can have additional configuration

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 01c60bc..723daef 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -104,6 +104,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
    private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
    private boolean trace;
+   private boolean noContainerID = false;
 
    public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
                          String username,
@@ -139,7 +140,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
          serializer.execute(new Runnable() {
             @Override
             public void run() {
-               getEndpoint().setContainer(safeGetContainerId());
+               if (!noContainerID) {
+                  getEndpoint().setContainer(safeGetContainerId());
+               }
                getEndpoint().setHostname(remoteURI.getHost());
                if (!getOfferedCapabilities().isEmpty()) {
                   getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
@@ -735,4 +738,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    public String toString() {
       return "AmqpConnection { " + connectionId + " }";
    }
+
+   public void setNoContainerID() {
+      noContainerID = true;
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 6ed7ed8..d4b16c1 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -289,6 +289,67 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       return receiver;
    }
 
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create the receiver link.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createMulticastReceiver(Source source, String receiverId, String receiveName) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
+      receiver.setSubscriptionName(receiveName);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
+   /**
+    * Create a receiver instance using the given Source
+    *
+    * @param source the caller created and configured Source used to create the receiver link.
+    * @return a newly created receiver that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpReceiver createMulticastReceiver(String receiverId, String address, String receiveName) throws Exception {
+      checkClosed();
+
+      final ClientFuture request = new ClientFuture();
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, receiverId);
+      receiver.setSubscriptionName(receiveName);
+
+      connection.getScheduler().execute(new Runnable() {
+
+         @Override
+         public void run() {
+            checkClosed();
+            receiver.setStateInspector(getStateInspector());
+            receiver.open(request);
+            pumpToProtonTransport(request);
+         }
+      });
+
+      request.sync();
+
+      return receiver;
+   }
+
    /**
     * Create a receiver instance using the given address that creates a durable subscription.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 6f373e5..0d5c874 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -33,6 +33,10 @@ import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.junit.After;
 import org.junit.Before;
 
@@ -42,6 +46,10 @@ import org.junit.Before;
  */
 public class AmqpClientTestSupport extends ActiveMQTestBase {
 
+   protected static Symbol SHARED = Symbol.getSymbol("shared");
+   protected static Symbol GLOBAL = Symbol.getSymbol("global");
+
+
    private boolean useSSL;
 
    protected JMSServerManager serverManager;
@@ -188,4 +196,19 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
    public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
       return new AmqpClient(brokerURI, username, password);
    }
+
+
+   protected void sendMessages(int numMessages, String address) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(address);
+      for (int i = 0; i < numMessages; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText("message-" +  i);
+         sender.send(message);
+      }
+      sender.close();
+      connection.connect();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
index 1ff74ed..e760d77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
@@ -54,7 +56,8 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
    @Override
    public void setUp() throws Exception {
       super.setUp();
-      server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false);
+      server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
+      server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false);
    }
 
    @Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
index 7a4299e..db2f1b4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
@@ -20,11 +20,11 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.junit.Test;
@@ -169,20 +169,61 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
       connection.close();
    }
 
-   private void sendMessages(int numMessages, String address) throws Exception {
+   @Test(timeout = 60000)
+   public void testConsumeWhenNoAddressCreatedNoAutoCreate() throws Exception {
+      AddressSettings settings = new AddressSettings();
+      settings.setAutoCreateAddresses(false);
+      server.getAddressSettingsRepository().addMatch(address.toString(), settings);
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      try {
+         session.createReceiver(address.toString());
+         fail("should throw exception");
+      } catch (Exception e) {
+         //ignore
+      }
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception {
+      AddressSettings settings = new AddressSettings();
+      settings.setAutoCreateAddresses(true);
+      server.getAddressSettingsRepository().addMatch(address.toString(), settings);
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createReceiver(address.toString());
+      sendMessages(1, address.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsMultiCast() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
+
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
-      AmqpSender sender = session.createSender(address);
-      for (int i = 0; i < numMessages; i++) {
-         AmqpMessage message = new AmqpMessage();
-         message.setText("message-" +  i);
-         sender.send(message);
+      try {
+         session.createReceiver(address.toString());
+         fail("expected exception");
+      } catch (Exception e) {
+         //ignore
       }
-      sender.close();
-      connection.connect();
+      connection.close();
    }
 
+
    protected Source createJmsSource(boolean topic) {
 
       Source source = new Source();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
index 2b4e2b4..c47207f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
@@ -63,7 +63,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
    }
 
    @Test(timeout = 60000)
-   public void testConsumeWhenOnlyAnyicast() throws Exception {
+   public void testConsumeWhenOnlyAnycast() throws Exception {
       server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
 
       sendMessages(1, address.toString());
@@ -83,20 +83,27 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport  {
       connection.close();
    }
 
-   private void sendMessages(int numMessages, String address) throws Exception {
+   @Test(timeout = 60000)
+   public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsAnyCast() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
+      server.createAddressInfo(addressInfo);
+      server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
+
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
-      AmqpSender sender = session.createSender(address);
-      for (int i = 0; i < numMessages; i++) {
-         AmqpMessage message = new AmqpMessage();
-         message.setText("message-" +  i);
-         sender.send(message);
+      try {
+         session.createReceiver(address.toString());
+         fail("expected exception");
+      } catch (Exception e) {
+         //ignore
       }
-      sender.close();
-      connection.connect();
+      connection.close();
    }
 
+
    protected Source createJmsSource(boolean topic) {
 
       Source source = new Source();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
new file mode 100644
index 0000000..a7d2b4e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ClientDefinedAnycastConsumerTest  extends AmqpClientTestSupport  {
+
+   SimpleString address = new SimpleString("testAddress");
+   SimpleString queue1 = new SimpleString("queue1");
+   SimpleString queue2 = new SimpleString("queue2");
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(address.toString());
+      sendMessages(1, address.toString());
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+
+      receiver.close();
+      connection.close();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
new file mode 100644
index 0000000..f5a7808
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport  {
+
+   SimpleString address = new SimpleString("testAddress");
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedVolatileAddress() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect("myClientId"));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedSource(TerminusDurability.NONE);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+      receiver.close();
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      receiver2.close();
+      //check its been deleted
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect("myClientId"));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedSource(TerminusDurability.NONE);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+      receiver.close();
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      receiver2.close();
+      //check its **Hasn't** been deleted
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect("myClientId"));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedSource(TerminusDurability.NONE);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      //check its been deleted
+      connection.close();
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect(false));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedGlobalSource(TerminusDurability.NONE);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
+      receiver.close();
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
+      receiver2.close();
+      //check its been deleted
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedDurableAddress() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect("myClientId"));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedSource(TerminusDurability.CONFIGURATION);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
+      receiver.close();
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      receiver2.close();
+      //check its been deleted
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect("myClientId"));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedSource(TerminusDurability.CONFIGURATION);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
+
+      connection.close();
+
+      connection = addConnection(client.connect("myClientId"));
+      session = connection.createSession();
+
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+
+      receiver.close();
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      receiver2.close();
+      //check its been deleted
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect("myClientId"));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedSource(TerminusDurability.CONFIGURATION);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
+
+      connection.close();
+
+      connection = addConnection(client.connect("myClientId"));
+      session = connection.createSession();
+
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      receiver = session.createDurableReceiver(null, "mySub");
+      receiver2 = session.createDurableReceiver(null, "mySub|2");
+
+      receiver.close();
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      receiver2.close();
+      //check its been deleted
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect(false));
+      AmqpSession session = connection.createSession();
+      Source source = createSharedGlobalSource(TerminusDurability.CONFIGURATION);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+      receiver.flow(1);
+      receiver2.flow(1);
+      sendMessages(2, address.toString());
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
+      receiver.close();
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
+      receiver2.close();
+      //check its been deleted
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void test2ConsumersOnNonSharedDurableAddress() throws Exception {
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+      server.createAddressInfo(addressInfo);
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect("myClientId"));
+      AmqpSession session = connection.createSession();
+      Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
+      Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
+      AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+      try {
+         session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
+         fail("Exception expected");
+      } catch (Exception e) {
+         //expected
+      }
+      connection.close();
+   }
+
+   private Source createNonSharedSource(TerminusDurability terminusDurability) {
+      Source source = new Source();
+      source.setAddress(address.toString());
+      source.setCapabilities(TOPIC_CAPABILITY);
+      source.setDurable(terminusDurability);
+      return source;
+   }
+
+   private Source createSharedSource(TerminusDurability terminusDurability) {
+      Source source = new Source();
+      source.setAddress(address.toString());
+      source.setCapabilities(TOPIC_CAPABILITY, SHARED);
+      source.setDurable(terminusDurability);
+      return source;
+   }
+
+   private Source createSharedGlobalSource(TerminusDurability terminusDurability) {
+      Source source = new Source();
+      source.setAddress(address.toString());
+      source.setCapabilities(TOPIC_CAPABILITY, SHARED, GLOBAL);
+      source.setDurable(terminusDurability);
+      return source;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
index 39197fd..3965947 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
@@ -30,6 +30,8 @@ import javax.jms.TopicSubscriber;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.After;
 import org.junit.Assert;
@@ -55,6 +57,8 @@ public class ProtonPubSubTest extends ProtonTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
+      server.createAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST));
+      server.createAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST));
       server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
       server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
       factory = new JmsConnectionFactory("amqp://localhost:5672");


Mime
View raw message